Browse Source

Merge pull request #36 from chrislusf/master

sync
pull/1620/head
hilimd 4 years ago
committed by GitHub
parent
commit
e0d5207ed9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      README.md
  2. 3
      docker/Makefile
  3. 95
      docker/local-registry-compose.yml
  4. 2
      k8s/seaweedfs/Chart.yaml
  5. 2
      k8s/seaweedfs/values.yaml
  6. 1
      other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
  7. 11
      other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
  8. 36
      other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
  9. 17
      other/java/client/src/main/proto/filer.proto
  10. 32
      other/java/unzip/pom.xml
  11. 46
      other/java/unzip/src/main/java/com/example/test/Example.java
  12. 2
      weed/command/benchmark.go
  13. 2
      weed/command/filer.go
  14. 2
      weed/command/filer_copy.go
  15. 4
      weed/filer/filer.go
  16. 4
      weed/filer/leveldb/leveldb_store_test.go
  17. 4
      weed/filer/leveldb2/leveldb2_store_test.go
  18. 3
      weed/operation/upload_content.go
  19. 17
      weed/pb/filer.proto
  20. 473
      weed/pb/filer_pb/filer.pb.go
  21. 6
      weed/pb/master.proto
  22. 1001
      weed/pb/master_pb/master.pb.go
  23. 44
      weed/s3api/auth_credentials.go
  24. 6
      weed/s3api/filer_util.go
  25. 6
      weed/s3api/http/header.go
  26. 80
      weed/s3api/s3api_bucket_handlers.go
  27. 3
      weed/s3api/s3api_object_copy_handlers.go
  28. 2
      weed/s3api/s3api_object_handlers.go
  29. 16
      weed/s3api/s3api_object_multipart_handlers.go
  30. 2
      weed/server/filer_server.go
  31. 2
      weed/server/master_grpc_server.go
  32. 2
      weed/server/master_server.go
  33. 2
      weed/shell/commands.go
  34. 1
      weed/topology/store_replicate.go
  35. 2
      weed/util/constants.go
  36. 1
      weed/util/http_util.go
  37. 7
      weed/wdclient/masterclient.go
  38. 10
      weed/wdclient/vid_map.go
  39. 2
      weed/wdclient/vid_map_test.go

15
README.md

@ -58,11 +58,11 @@ Your support will be really appreciated by me and other supporters!
Table of Contents Table of Contents
================= =================
* [Quick Start](#quick-start)
* [Introduction](#introduction) * [Introduction](#introduction)
* [Features](#features) * [Features](#features)
* [Additional Features](#additional-features) * [Additional Features](#additional-features)
* [Filer Features](#filer-features) * [Filer Features](#filer-features)
* [Quick Start](#quick-start)
* [Example: Using Seaweed Object Store](#example-Using-Seaweed-Object-Store) * [Example: Using Seaweed Object Store](#example-Using-Seaweed-Object-Store)
* [Architecture](#architecture) * [Architecture](#architecture)
* [Compared to Other File Systems](#compared-to-other-file-systems) * [Compared to Other File Systems](#compared-to-other-file-systems)
@ -76,6 +76,13 @@ Table of Contents
* [Benchmark](#Benchmark) * [Benchmark](#Benchmark)
* [License](#license) * [License](#license)
## Quick Start ##
* Download the latest binary from https://github.com/chrislusf/seaweedfs/releases and unzip a single binary file `weed` or `weed.exe`
* Run `weed server -dir=/some/data/dir -s3` to start one master, one volume server, one filer, and one S3 gateway.
Also, to increase capacity, just add more volume servers by `weed volume -dir="/some/data/dir2" -mserver="<master_host>:9333" -port=8081` locally or a different machine. That is it!
## Introduction ## ## Introduction ##
SeaweedFS is a simple and highly scalable distributed file system. There are two objectives: SeaweedFS is a simple and highly scalable distributed file system. There are two objectives:
@ -142,12 +149,6 @@ On top of the object store, optional [Filer] can support directories and POSIX a
[Back to TOC](#table-of-contents) [Back to TOC](#table-of-contents)
## Quick Start ##
* Download the latest binary from https://github.com/chrislusf/seaweedfs/releases and unzip a single binary file `weed` or `weed.exe`
* Run `weed server -dir=/some/data/dir -s3` to start one master, one volume server, one filer, and one S3 gateway.
Also, to increase capacity, just add more volume servers by `weed volume -dir="/some/data/dir2" -mserver="<master_host>:9333" -port=8081` locally or a different machine. That is it!
## Example: Using Seaweed Object Store ## ## Example: Using Seaweed Object Store ##
By default, the master node runs on port 9333, and the volume nodes run on port 8080. By default, the master node runs on port 9333, and the volume nodes run on port 8080.

3
docker/Makefile

@ -12,6 +12,9 @@ build:
dev: build dev: build
docker-compose -f local-dev-compose.yml -p seaweedfs up docker-compose -f local-dev-compose.yml -p seaweedfs up
dev_registry: build
docker-compose -f local-registry-compose.yml -p seaweedfs up
cluster: build cluster: build
docker-compose -f local-cluster-compose.yml -p seaweedfs up docker-compose -f local-cluster-compose.yml -p seaweedfs up

95
docker/local-registry-compose.yml

@ -0,0 +1,95 @@
version: '2'
services:
master:
image: chrislusf/seaweedfs:local
ports:
- 9333:9333
- 19333:19333
command: "master -ip=master -volumeSizeLimitMB=1024"
volume:
image: chrislusf/seaweedfs:local
ports:
- 8080:8080
- 18080:18080
command: "volume -mserver=master:9333 -port=8080 -ip=volume -max=0 -preStopSeconds=1"
volumes:
- type: bind
source: /Volumes/mobile_disk/data
target: /data
depends_on:
- master
filer:
image: chrislusf/seaweedfs:local
ports:
- 8888:8888
- 18888:18888
command: 'filer -master="master:9333"'
depends_on:
- master
- volume
s3:
image: chrislusf/seaweedfs:local
ports:
- 8333:8333
command: '-v 9 s3 -filer="filer:8888"'
depends_on:
- master
- volume
- filer
minio:
image: minio/minio
ports:
- 9000:9000
command: 'minio server /data'
environment:
MINIO_ACCESS_KEY: "some_access_key1"
MINIO_SECRET_KEY: "some_secret_key1"
depends_on:
- master
registry1:
image: registry:2
environment:
REGISTRY_HTTP_ADDR: "0.0.0.0:5001" # seaweedfs s3
REGISTRY_LOG_LEVEL: "debug"
REGISTRY_STORAGE: "s3"
REGISTRY_STORAGE_S3_REGION: "us-east-1"
REGISTRY_STORAGE_S3_REGIONENDPOINT: "http://s3:8333"
REGISTRY_STORAGE_S3_BUCKET: "registry"
REGISTRY_STORAGE_S3_ACCESSKEY: "some_access_key1"
REGISTRY_STORAGE_S3_SECRETKEY: "some_secret_key1"
REGISTRY_STORAGE_S3_V4AUTH: "true"
REGISTRY_STORAGE_S3_SECURE: "false"
REGISTRY_STORAGE_S3_SKIPVERIFY: "true"
REGISTRY_STORAGE_S3_ROOTDIRECTORY: "/"
REGISTRY_STORAGE_DELETE_ENABLED: "true"
REGISTRY_STORAGE_REDIRECT_DISABLE: "true"
REGISTRY_VALIDATION_DISABLED: "true"
ports:
- 5001:5001
depends_on:
- s3
- minio
registry2:
image: registry:2
environment:
REGISTRY_HTTP_ADDR: "0.0.0.0:5002" # minio
REGISTRY_LOG_LEVEL: "debug"
REGISTRY_STORAGE: "s3"
REGISTRY_STORAGE_S3_REGION: "us-east-1"
REGISTRY_STORAGE_S3_REGIONENDPOINT: "http://minio:9000"
REGISTRY_STORAGE_S3_BUCKET: "registry"
REGISTRY_STORAGE_S3_ACCESSKEY: "some_access_key1"
REGISTRY_STORAGE_S3_SECRETKEY: "some_secret_key1"
REGISTRY_STORAGE_S3_V4AUTH: "true"
REGISTRY_STORAGE_S3_SECURE: "false"
REGISTRY_STORAGE_S3_SKIPVERIFY: "true"
REGISTRY_STORAGE_S3_ROOTDIRECTORY: "/"
REGISTRY_STORAGE_DELETE_ENABLED: "true"
REGISTRY_STORAGE_REDIRECT_DISABLE: "true"
REGISTRY_VALIDATION_DISABLED: "true"
ports:
- 5002:5002
depends_on:
- s3
- minio

2
k8s/seaweedfs/Chart.yaml

@ -1,4 +1,4 @@
apiVersion: v1 apiVersion: v1
description: SeaweedFS description: SeaweedFS
name: seaweedfs name: seaweedfs
version: 2.09
version: 2.10

2
k8s/seaweedfs/values.yaml

@ -4,7 +4,7 @@ global:
registry: "" registry: ""
repository: "" repository: ""
imageName: chrislusf/seaweedfs imageName: chrislusf/seaweedfs
imageTag: "2.09"
imageTag: "2.10"
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret imagePullSecrets: imagepullsecret
restartPolicy: Always restartPolicy: Always

1
other/java/client/src/main/java/seaweedfs/client/ChunkCache.java

@ -15,7 +15,6 @@ public class ChunkCache {
} }
this.cache = CacheBuilder.newBuilder() this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries) .maximumSize(maxEntries)
.weakValues()
.expireAfterAccess(1, TimeUnit.HOURS) .expireAfterAccess(1, TimeUnit.HOURS)
.build(); .build();
} }

11
other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java

@ -19,6 +19,7 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
static ChunkCache chunkCache = new ChunkCache(4); static ChunkCache chunkCache = new ChunkCache(4);
static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024);
// returns bytesRead // returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@ -30,13 +31,19 @@ public class SeaweedRead {
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
for (ChunkView chunkView : chunkViews) { for (ChunkView chunkView : chunkViews) {
String vid = parseVolumeId(chunkView.fileId); String vid = parseVolumeId(chunkView.fileId);
if (volumeIdCache.getLocations(vid)==null){
lookupRequest.addVolumeIds(vid); lookupRequest.addVolumeIds(vid);
} }
}
if (lookupRequest.getVolumeIdsCount()>0){
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
.getBlockingStub().lookupVolume(lookupRequest.build()); .getBlockingStub().lookupVolume(lookupRequest.build());
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
for (Map.Entry<String,FilerProto.Locations> entry : vid2Locations.entrySet()) {
volumeIdCache.setLocations(entry.getKey(), entry.getValue());
}
}
//TODO parallel this //TODO parallel this
long readCount = 0; long readCount = 0;
@ -50,7 +57,7 @@ public class SeaweedRead {
startOffset += gap; startOffset += gap;
} }
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
FilerProto.Locations locations = volumeIdCache.getLocations(parseVolumeId(chunkView.fileId));
if (locations == null || locations.getLocationsCount() == 0) { if (locations == null || locations.getLocationsCount() == 0) {
LOG.error("failed to locate {}", chunkView.fileId); LOG.error("failed to locate {}", chunkView.fileId);
// log here! // log here!

36
other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java

@ -0,0 +1,36 @@
package seaweedfs.client;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
public class VolumeIdCache {
private Cache<String, FilerProto.Locations> cache = null;
public VolumeIdCache(int maxEntries) {
if (maxEntries == 0) {
return;
}
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
.expireAfterAccess(1, TimeUnit.HOURS)
.build();
}
public FilerProto.Locations getLocations(String volumeId) {
if (this.cache == null) {
return null;
}
return this.cache.getIfPresent(volumeId);
}
public void setLocations(String volumeId, FilerProto.Locations locations) {
if (this.cache == null) {
return;
}
this.cache.put(volumeId, locations);
}
}

17
other/java/client/src/main/proto/filer.proto

@ -348,3 +348,20 @@ message KvPutRequest {
message KvPutResponse { message KvPutResponse {
string error = 1; string error = 1;
} }
// path-based configurations
message FilerConf {
int32 version = 1;
message PathConf {
string location_prefix = 1;
string collection = 2;
string replication = 3;
string ttl = 4;
enum DiskType {
HDD = 0;
SSD = 1;
}
DiskType disk_type = 5;
}
repeated PathConf locations = 2;
}

32
other/java/unzip/pom.xml

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>unzip</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.5.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop2-client</artifactId>
<version>1.5.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

46
other/java/unzip/src/main/java/com/example/test/Example.java

@ -0,0 +1,46 @@
package com.example.test;
import seaweed.hdfs.SeaweedInputStream;
import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerGrpcClient;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class Example {
public static FilerClient filerClient = new FilerClient("localhost", 18888);
public static FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
public static void main(String[] args) throws IOException {
// 本地模式速度很快
parseZip("/Users/chris/tmp/test.zip");
// swfs读取
SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
filerGrpcClient,
new org.apache.hadoop.fs.FileSystem.Statistics(""),
"/",
filerClient.lookupEntry("/", "test.zip")
);
parseZip(seaweedInputStream);
}
public static void parseZip(String filename) throws IOException {
FileInputStream fileInputStream = new FileInputStream(filename);
parseZip(fileInputStream);
}
public static void parseZip(InputStream is) throws IOException {
ZipInputStream zin = new ZipInputStream(is);
ZipEntry ze;
while ((ze = zin.getNextEntry()) != null) {
System.out.println(ze.getName());
}
}
}

2
weed/command/benchmark.go

@ -125,7 +125,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, strings.Split(*b.masters, ","))
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ","))
go b.masterClient.KeepConnectedToMaster() go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected() b.masterClient.WaitUntilConnected()

2
weed/command/filer.go

@ -59,7 +59,7 @@ func init() {
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit") f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size") f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack") f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack")
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed") f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")

2
weed/command/filer_copy.go

@ -122,7 +122,7 @@ func runCopy(cmd *Command, args []string) bool {
expectedBucket := restPath[:strings.Index(restPath, "/")] expectedBucket := restPath[:strings.Index(restPath, "/")]
if *copy.collection == "" { if *copy.collection == "" {
*copy.collection = expectedBucket *copy.collection = expectedBucket
} else {
} else if *copy.collection != expectedBucket {
fmt.Printf("destination %s uses collection \"%s\": unexpected collection \"%v\"\n", urlPath, expectedBucket, *copy.collection) fmt.Printf("destination %s uses collection \"%s\": unexpected collection \"%v\"\n", urlPath, expectedBucket, *copy.collection)
return true return true
} }

4
weed/filer/filer.go

@ -44,9 +44,9 @@ type Filer struct {
} }
func NewFiler(masters []string, grpcDialOption grpc.DialOption, func NewFiler(masters []string, grpcDialOption grpc.DialOption,
filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
filerHost string, filerGrpcPort uint32, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{ f := &Filer{
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(), fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
} }

4
weed/filer/leveldb/leveldb_store_test.go

@ -11,7 +11,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", nil)
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDBStore{} store := &LevelDBStore{}
@ -65,7 +65,7 @@ func TestCreateAndFind(t *testing.T) {
} }
func TestEmptyRoot(t *testing.T) { func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", nil)
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDBStore{} store := &LevelDBStore{}

4
weed/filer/leveldb2/leveldb2_store_test.go

@ -11,7 +11,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", nil)
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDB2Store{} store := &LevelDB2Store{}
@ -65,7 +65,7 @@ func TestCreateAndFind(t *testing.T) {
} }
func TestEmptyRoot(t *testing.T) { func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", nil)
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDB2Store{} store := &LevelDB2Store{}

3
weed/operation/upload_content.go

@ -169,7 +169,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = w.Write(data) _, err = w.Write(data)
return return
}, filename, contentIsGzipped, 0, mtype, pairMap, jwt)
}, filename, contentIsGzipped, len(data), mtype, pairMap, jwt)
} }
if uploadResult == nil { if uploadResult == nil {
@ -190,6 +190,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
body_writer := multipart.NewWriter(buf) body_writer := multipart.NewWriter(buf)
h := make(textproto.MIMEHeader) h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename)))
h.Set("Idempotency-Key", uploadUrl)
if mtype == "" { if mtype == "" {
mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename)))
} }

17
weed/pb/filer.proto

@ -348,3 +348,20 @@ message KvPutRequest {
message KvPutResponse { message KvPutResponse {
string error = 1; string error = 1;
} }
// path-based configurations
message FilerConf {
int32 version = 1;
message PathConf {
string location_prefix = 1;
string collection = 2;
string replication = 3;
string ttl = 4;
enum DiskType {
HDD = 0;
SSD = 1;
}
DiskType disk_type = 5;
}
repeated PathConf locations = 2;
}

473
weed/pb/filer_pb/filer.pb.go

@ -29,6 +29,52 @@ const (
// of the legacy proto package is being used. // of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4 const _ = proto.ProtoPackageIsVersion4
type FilerConf_PathConf_DiskType int32
const (
FilerConf_PathConf_HDD FilerConf_PathConf_DiskType = 0
FilerConf_PathConf_SSD FilerConf_PathConf_DiskType = 1
)
// Enum value maps for FilerConf_PathConf_DiskType.
var (
FilerConf_PathConf_DiskType_name = map[int32]string{
0: "HDD",
1: "SSD",
}
FilerConf_PathConf_DiskType_value = map[string]int32{
"HDD": 0,
"SSD": 1,
}
)
func (x FilerConf_PathConf_DiskType) Enum() *FilerConf_PathConf_DiskType {
p := new(FilerConf_PathConf_DiskType)
*p = x
return p
}
func (x FilerConf_PathConf_DiskType) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (FilerConf_PathConf_DiskType) Descriptor() protoreflect.EnumDescriptor {
return file_filer_proto_enumTypes[0].Descriptor()
}
func (FilerConf_PathConf_DiskType) Type() protoreflect.EnumType {
return &file_filer_proto_enumTypes[0]
}
func (x FilerConf_PathConf_DiskType) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use FilerConf_PathConf_DiskType.Descriptor instead.
func (FilerConf_PathConf_DiskType) EnumDescriptor() ([]byte, []int) {
return file_filer_proto_rawDescGZIP(), []int{47, 0, 0}
}
type LookupDirectoryEntryRequest struct { type LookupDirectoryEntryRequest struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -3002,6 +3048,62 @@ func (x *KvPutResponse) GetError() string {
return "" return ""
} }
// path-based configurations
type FilerConf struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
Locations []*FilerConf_PathConf `protobuf:"bytes,2,rep,name=locations,proto3" json:"locations,omitempty"`
}
func (x *FilerConf) Reset() {
*x = FilerConf{}
if protoimpl.UnsafeEnabled {
mi := &file_filer_proto_msgTypes[47]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FilerConf) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FilerConf) ProtoMessage() {}
func (x *FilerConf) ProtoReflect() protoreflect.Message {
mi := &file_filer_proto_msgTypes[47]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FilerConf.ProtoReflect.Descriptor instead.
func (*FilerConf) Descriptor() ([]byte, []int) {
return file_filer_proto_rawDescGZIP(), []int{47}
}
func (x *FilerConf) GetVersion() int32 {
if x != nil {
return x.Version
}
return 0
}
func (x *FilerConf) GetLocations() []*FilerConf_PathConf {
if x != nil {
return x.Locations
}
return nil
}
// if found, send the exact address // if found, send the exact address
// if not found, send the full list of existing brokers // if not found, send the full list of existing brokers
type LocateBrokerResponse_Resource struct { type LocateBrokerResponse_Resource struct {
@ -3016,7 +3118,7 @@ type LocateBrokerResponse_Resource struct {
func (x *LocateBrokerResponse_Resource) Reset() { func (x *LocateBrokerResponse_Resource) Reset() {
*x = LocateBrokerResponse_Resource{} *x = LocateBrokerResponse_Resource{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_filer_proto_msgTypes[49]
mi := &file_filer_proto_msgTypes[50]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -3029,7 +3131,7 @@ func (x *LocateBrokerResponse_Resource) String() string {
func (*LocateBrokerResponse_Resource) ProtoMessage() {} func (*LocateBrokerResponse_Resource) ProtoMessage() {}
func (x *LocateBrokerResponse_Resource) ProtoReflect() protoreflect.Message { func (x *LocateBrokerResponse_Resource) ProtoReflect() protoreflect.Message {
mi := &file_filer_proto_msgTypes[49]
mi := &file_filer_proto_msgTypes[50]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -3059,6 +3161,85 @@ func (x *LocateBrokerResponse_Resource) GetResourceCount() int32 {
return 0 return 0
} }
type FilerConf_PathConf struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
LocationPrefix string `protobuf:"bytes,1,opt,name=location_prefix,json=locationPrefix,proto3" json:"location_prefix,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection,proto3" json:"collection,omitempty"`
Replication string `protobuf:"bytes,3,opt,name=replication,proto3" json:"replication,omitempty"`
Ttl string `protobuf:"bytes,4,opt,name=ttl,proto3" json:"ttl,omitempty"`
DiskType FilerConf_PathConf_DiskType `protobuf:"varint,5,opt,name=disk_type,json=diskType,proto3,enum=filer_pb.FilerConf_PathConf_DiskType" json:"disk_type,omitempty"`
}
func (x *FilerConf_PathConf) Reset() {
*x = FilerConf_PathConf{}
if protoimpl.UnsafeEnabled {
mi := &file_filer_proto_msgTypes[51]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FilerConf_PathConf) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FilerConf_PathConf) ProtoMessage() {}
func (x *FilerConf_PathConf) ProtoReflect() protoreflect.Message {
mi := &file_filer_proto_msgTypes[51]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FilerConf_PathConf.ProtoReflect.Descriptor instead.
func (*FilerConf_PathConf) Descriptor() ([]byte, []int) {
return file_filer_proto_rawDescGZIP(), []int{47, 0}
}
func (x *FilerConf_PathConf) GetLocationPrefix() string {
if x != nil {
return x.LocationPrefix
}
return ""
}
func (x *FilerConf_PathConf) GetCollection() string {
if x != nil {
return x.Collection
}
return ""
}
func (x *FilerConf_PathConf) GetReplication() string {
if x != nil {
return x.Replication
}
return ""
}
func (x *FilerConf_PathConf) GetTtl() string {
if x != nil {
return x.Ttl
}
return ""
}
func (x *FilerConf_PathConf) GetDiskType() FilerConf_PathConf_DiskType {
if x != nil {
return x.DiskType
}
return FilerConf_PathConf_HDD
}
var File_filer_proto protoreflect.FileDescriptor var File_filer_proto protoreflect.FileDescriptor
var file_filer_proto_rawDesc = []byte{ var file_filer_proto_rawDesc = []byte{
@ -3433,7 +3614,28 @@ var file_filer_proto_rawDesc = []byte{
0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x4b, 0x76, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x75, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x4b, 0x76, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0xdc, 0x0c, 0x0a, 0x0c, 0x53, 0x65,
0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xcd, 0x02, 0x0a, 0x09, 0x46, 0x69,
0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f,
0x6e, 0x12, 0x3a, 0x0a, 0x09, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02,
0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e,
0x46, 0x69, 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x43, 0x6f,
0x6e, 0x66, 0x52, 0x09, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0xe9, 0x01,
0x0a, 0x08, 0x50, 0x61, 0x74, 0x68, 0x43, 0x6f, 0x6e, 0x66, 0x12, 0x27, 0x0a, 0x0f, 0x6c, 0x6f,
0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0e, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x65,
0x66, 0x69, 0x78, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f,
0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74,
0x69, 0x6f, 0x6e, 0x12, 0x20, 0x0a, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x74, 0x6c, 0x18, 0x04, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x74, 0x74, 0x6c, 0x12, 0x42, 0x0a, 0x09, 0x64, 0x69, 0x73, 0x6b, 0x5f,
0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x25, 0x2e, 0x66, 0x69, 0x6c,
0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x2e,
0x50, 0x61, 0x74, 0x68, 0x43, 0x6f, 0x6e, 0x66, 0x2e, 0x44, 0x69, 0x73, 0x6b, 0x54, 0x79, 0x70,
0x65, 0x52, 0x08, 0x64, 0x69, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x22, 0x1c, 0x0a, 0x08, 0x44,
0x69, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x48, 0x44, 0x44, 0x10, 0x00,
0x12, 0x07, 0x0a, 0x03, 0x53, 0x53, 0x44, 0x10, 0x01, 0x32, 0xdc, 0x0c, 0x0a, 0x0c, 0x53, 0x65,
0x61, 0x77, 0x65, 0x65, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x12, 0x67, 0x0a, 0x14, 0x4c, 0x6f, 0x61, 0x77, 0x65, 0x65, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x12, 0x67, 0x0a, 0x14, 0x4c, 0x6f,
0x6f, 0x6b, 0x75, 0x70, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x6f, 0x6b, 0x75, 0x70, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x12, 0x25, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x72, 0x79, 0x12, 0x25, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f,
@ -3556,123 +3758,129 @@ func file_filer_proto_rawDescGZIP() []byte {
return file_filer_proto_rawDescData return file_filer_proto_rawDescData
} }
var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 50)
var file_filer_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 52)
var file_filer_proto_goTypes = []interface{}{ var file_filer_proto_goTypes = []interface{}{
(*LookupDirectoryEntryRequest)(nil), // 0: filer_pb.LookupDirectoryEntryRequest
(*LookupDirectoryEntryResponse)(nil), // 1: filer_pb.LookupDirectoryEntryResponse
(*ListEntriesRequest)(nil), // 2: filer_pb.ListEntriesRequest
(*ListEntriesResponse)(nil), // 3: filer_pb.ListEntriesResponse
(*Entry)(nil), // 4: filer_pb.Entry
(*FullEntry)(nil), // 5: filer_pb.FullEntry
(*EventNotification)(nil), // 6: filer_pb.EventNotification
(*FileChunk)(nil), // 7: filer_pb.FileChunk
(*FileChunkManifest)(nil), // 8: filer_pb.FileChunkManifest
(*FileId)(nil), // 9: filer_pb.FileId
(*FuseAttributes)(nil), // 10: filer_pb.FuseAttributes
(*CreateEntryRequest)(nil), // 11: filer_pb.CreateEntryRequest
(*CreateEntryResponse)(nil), // 12: filer_pb.CreateEntryResponse
(*UpdateEntryRequest)(nil), // 13: filer_pb.UpdateEntryRequest
(*UpdateEntryResponse)(nil), // 14: filer_pb.UpdateEntryResponse
(*AppendToEntryRequest)(nil), // 15: filer_pb.AppendToEntryRequest
(*AppendToEntryResponse)(nil), // 16: filer_pb.AppendToEntryResponse
(*DeleteEntryRequest)(nil), // 17: filer_pb.DeleteEntryRequest
(*DeleteEntryResponse)(nil), // 18: filer_pb.DeleteEntryResponse
(*AtomicRenameEntryRequest)(nil), // 19: filer_pb.AtomicRenameEntryRequest
(*AtomicRenameEntryResponse)(nil), // 20: filer_pb.AtomicRenameEntryResponse
(*AssignVolumeRequest)(nil), // 21: filer_pb.AssignVolumeRequest
(*AssignVolumeResponse)(nil), // 22: filer_pb.AssignVolumeResponse
(*LookupVolumeRequest)(nil), // 23: filer_pb.LookupVolumeRequest
(*Locations)(nil), // 24: filer_pb.Locations
(*Location)(nil), // 25: filer_pb.Location
(*LookupVolumeResponse)(nil), // 26: filer_pb.LookupVolumeResponse
(*Collection)(nil), // 27: filer_pb.Collection
(*CollectionListRequest)(nil), // 28: filer_pb.CollectionListRequest
(*CollectionListResponse)(nil), // 29: filer_pb.CollectionListResponse
(*DeleteCollectionRequest)(nil), // 30: filer_pb.DeleteCollectionRequest
(*DeleteCollectionResponse)(nil), // 31: filer_pb.DeleteCollectionResponse
(*StatisticsRequest)(nil), // 32: filer_pb.StatisticsRequest
(*StatisticsResponse)(nil), // 33: filer_pb.StatisticsResponse
(*GetFilerConfigurationRequest)(nil), // 34: filer_pb.GetFilerConfigurationRequest
(*GetFilerConfigurationResponse)(nil), // 35: filer_pb.GetFilerConfigurationResponse
(*SubscribeMetadataRequest)(nil), // 36: filer_pb.SubscribeMetadataRequest
(*SubscribeMetadataResponse)(nil), // 37: filer_pb.SubscribeMetadataResponse
(*LogEntry)(nil), // 38: filer_pb.LogEntry
(*KeepConnectedRequest)(nil), // 39: filer_pb.KeepConnectedRequest
(*KeepConnectedResponse)(nil), // 40: filer_pb.KeepConnectedResponse
(*LocateBrokerRequest)(nil), // 41: filer_pb.LocateBrokerRequest
(*LocateBrokerResponse)(nil), // 42: filer_pb.LocateBrokerResponse
(*KvGetRequest)(nil), // 43: filer_pb.KvGetRequest
(*KvGetResponse)(nil), // 44: filer_pb.KvGetResponse
(*KvPutRequest)(nil), // 45: filer_pb.KvPutRequest
(*KvPutResponse)(nil), // 46: filer_pb.KvPutResponse
nil, // 47: filer_pb.Entry.ExtendedEntry
nil, // 48: filer_pb.LookupVolumeResponse.LocationsMapEntry
(*LocateBrokerResponse_Resource)(nil), // 49: filer_pb.LocateBrokerResponse.Resource
(FilerConf_PathConf_DiskType)(0), // 0: filer_pb.FilerConf.PathConf.DiskType
(*LookupDirectoryEntryRequest)(nil), // 1: filer_pb.LookupDirectoryEntryRequest
(*LookupDirectoryEntryResponse)(nil), // 2: filer_pb.LookupDirectoryEntryResponse
(*ListEntriesRequest)(nil), // 3: filer_pb.ListEntriesRequest
(*ListEntriesResponse)(nil), // 4: filer_pb.ListEntriesResponse
(*Entry)(nil), // 5: filer_pb.Entry
(*FullEntry)(nil), // 6: filer_pb.FullEntry
(*EventNotification)(nil), // 7: filer_pb.EventNotification
(*FileChunk)(nil), // 8: filer_pb.FileChunk
(*FileChunkManifest)(nil), // 9: filer_pb.FileChunkManifest
(*FileId)(nil), // 10: filer_pb.FileId
(*FuseAttributes)(nil), // 11: filer_pb.FuseAttributes
(*CreateEntryRequest)(nil), // 12: filer_pb.CreateEntryRequest
(*CreateEntryResponse)(nil), // 13: filer_pb.CreateEntryResponse
(*UpdateEntryRequest)(nil), // 14: filer_pb.UpdateEntryRequest
(*UpdateEntryResponse)(nil), // 15: filer_pb.UpdateEntryResponse
(*AppendToEntryRequest)(nil), // 16: filer_pb.AppendToEntryRequest
(*AppendToEntryResponse)(nil), // 17: filer_pb.AppendToEntryResponse
(*DeleteEntryRequest)(nil), // 18: filer_pb.DeleteEntryRequest
(*DeleteEntryResponse)(nil), // 19: filer_pb.DeleteEntryResponse
(*AtomicRenameEntryRequest)(nil), // 20: filer_pb.AtomicRenameEntryRequest
(*AtomicRenameEntryResponse)(nil), // 21: filer_pb.AtomicRenameEntryResponse
(*AssignVolumeRequest)(nil), // 22: filer_pb.AssignVolumeRequest
(*AssignVolumeResponse)(nil), // 23: filer_pb.AssignVolumeResponse
(*LookupVolumeRequest)(nil), // 24: filer_pb.LookupVolumeRequest
(*Locations)(nil), // 25: filer_pb.Locations
(*Location)(nil), // 26: filer_pb.Location
(*LookupVolumeResponse)(nil), // 27: filer_pb.LookupVolumeResponse
(*Collection)(nil), // 28: filer_pb.Collection
(*CollectionListRequest)(nil), // 29: filer_pb.CollectionListRequest
(*CollectionListResponse)(nil), // 30: filer_pb.CollectionListResponse
(*DeleteCollectionRequest)(nil), // 31: filer_pb.DeleteCollectionRequest
(*DeleteCollectionResponse)(nil), // 32: filer_pb.DeleteCollectionResponse
(*StatisticsRequest)(nil), // 33: filer_pb.StatisticsRequest
(*StatisticsResponse)(nil), // 34: filer_pb.StatisticsResponse
(*GetFilerConfigurationRequest)(nil), // 35: filer_pb.GetFilerConfigurationRequest
(*GetFilerConfigurationResponse)(nil), // 36: filer_pb.GetFilerConfigurationResponse
(*SubscribeMetadataRequest)(nil), // 37: filer_pb.SubscribeMetadataRequest
(*SubscribeMetadataResponse)(nil), // 38: filer_pb.SubscribeMetadataResponse
(*LogEntry)(nil), // 39: filer_pb.LogEntry
(*KeepConnectedRequest)(nil), // 40: filer_pb.KeepConnectedRequest
(*KeepConnectedResponse)(nil), // 41: filer_pb.KeepConnectedResponse
(*LocateBrokerRequest)(nil), // 42: filer_pb.LocateBrokerRequest
(*LocateBrokerResponse)(nil), // 43: filer_pb.LocateBrokerResponse
(*KvGetRequest)(nil), // 44: filer_pb.KvGetRequest
(*KvGetResponse)(nil), // 45: filer_pb.KvGetResponse
(*KvPutRequest)(nil), // 46: filer_pb.KvPutRequest
(*KvPutResponse)(nil), // 47: filer_pb.KvPutResponse
(*FilerConf)(nil), // 48: filer_pb.FilerConf
nil, // 49: filer_pb.Entry.ExtendedEntry
nil, // 50: filer_pb.LookupVolumeResponse.LocationsMapEntry
(*LocateBrokerResponse_Resource)(nil), // 51: filer_pb.LocateBrokerResponse.Resource
(*FilerConf_PathConf)(nil), // 52: filer_pb.FilerConf.PathConf
} }
var file_filer_proto_depIdxs = []int32{ var file_filer_proto_depIdxs = []int32{
4, // 0: filer_pb.LookupDirectoryEntryResponse.entry:type_name -> filer_pb.Entry
4, // 1: filer_pb.ListEntriesResponse.entry:type_name -> filer_pb.Entry
7, // 2: filer_pb.Entry.chunks:type_name -> filer_pb.FileChunk
10, // 3: filer_pb.Entry.attributes:type_name -> filer_pb.FuseAttributes
47, // 4: filer_pb.Entry.extended:type_name -> filer_pb.Entry.ExtendedEntry
4, // 5: filer_pb.FullEntry.entry:type_name -> filer_pb.Entry
4, // 6: filer_pb.EventNotification.old_entry:type_name -> filer_pb.Entry
4, // 7: filer_pb.EventNotification.new_entry:type_name -> filer_pb.Entry
9, // 8: filer_pb.FileChunk.fid:type_name -> filer_pb.FileId
9, // 9: filer_pb.FileChunk.source_fid:type_name -> filer_pb.FileId
7, // 10: filer_pb.FileChunkManifest.chunks:type_name -> filer_pb.FileChunk
4, // 11: filer_pb.CreateEntryRequest.entry:type_name -> filer_pb.Entry
4, // 12: filer_pb.UpdateEntryRequest.entry:type_name -> filer_pb.Entry
7, // 13: filer_pb.AppendToEntryRequest.chunks:type_name -> filer_pb.FileChunk
25, // 14: filer_pb.Locations.locations:type_name -> filer_pb.Location
48, // 15: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry
27, // 16: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection
6, // 17: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification
49, // 18: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource
24, // 19: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations
0, // 20: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest
2, // 21: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest
11, // 22: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest
13, // 23: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest
15, // 24: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest
17, // 25: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest
19, // 26: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest
21, // 27: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest
23, // 28: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest
28, // 29: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest
30, // 30: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest
32, // 31: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest
34, // 32: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest
36, // 33: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest
36, // 34: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest
39, // 35: filer_pb.SeaweedFiler.KeepConnected:input_type -> filer_pb.KeepConnectedRequest
41, // 36: filer_pb.SeaweedFiler.LocateBroker:input_type -> filer_pb.LocateBrokerRequest
43, // 37: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest
45, // 38: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest
1, // 39: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse
3, // 40: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse
12, // 41: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse
14, // 42: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse
16, // 43: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse
18, // 44: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse
20, // 45: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse
22, // 46: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse
26, // 47: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse
29, // 48: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse
31, // 49: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse
33, // 50: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse
35, // 51: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse
37, // 52: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse
37, // 53: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse
40, // 54: filer_pb.SeaweedFiler.KeepConnected:output_type -> filer_pb.KeepConnectedResponse
42, // 55: filer_pb.SeaweedFiler.LocateBroker:output_type -> filer_pb.LocateBrokerResponse
44, // 56: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse
46, // 57: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse
39, // [39:58] is the sub-list for method output_type
20, // [20:39] is the sub-list for method input_type
20, // [20:20] is the sub-list for extension type_name
20, // [20:20] is the sub-list for extension extendee
0, // [0:20] is the sub-list for field type_name
5, // 0: filer_pb.LookupDirectoryEntryResponse.entry:type_name -> filer_pb.Entry
5, // 1: filer_pb.ListEntriesResponse.entry:type_name -> filer_pb.Entry
8, // 2: filer_pb.Entry.chunks:type_name -> filer_pb.FileChunk
11, // 3: filer_pb.Entry.attributes:type_name -> filer_pb.FuseAttributes
49, // 4: filer_pb.Entry.extended:type_name -> filer_pb.Entry.ExtendedEntry
5, // 5: filer_pb.FullEntry.entry:type_name -> filer_pb.Entry
5, // 6: filer_pb.EventNotification.old_entry:type_name -> filer_pb.Entry
5, // 7: filer_pb.EventNotification.new_entry:type_name -> filer_pb.Entry
10, // 8: filer_pb.FileChunk.fid:type_name -> filer_pb.FileId
10, // 9: filer_pb.FileChunk.source_fid:type_name -> filer_pb.FileId
8, // 10: filer_pb.FileChunkManifest.chunks:type_name -> filer_pb.FileChunk
5, // 11: filer_pb.CreateEntryRequest.entry:type_name -> filer_pb.Entry
5, // 12: filer_pb.UpdateEntryRequest.entry:type_name -> filer_pb.Entry
8, // 13: filer_pb.AppendToEntryRequest.chunks:type_name -> filer_pb.FileChunk
26, // 14: filer_pb.Locations.locations:type_name -> filer_pb.Location
50, // 15: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry
28, // 16: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection
7, // 17: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification
51, // 18: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource
52, // 19: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf
25, // 20: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations
0, // 21: filer_pb.FilerConf.PathConf.disk_type:type_name -> filer_pb.FilerConf.PathConf.DiskType
1, // 22: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest
3, // 23: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest
12, // 24: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest
14, // 25: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest
16, // 26: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest
18, // 27: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest
20, // 28: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest
22, // 29: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest
24, // 30: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest
29, // 31: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest
31, // 32: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest
33, // 33: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest
35, // 34: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest
37, // 35: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest
37, // 36: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest
40, // 37: filer_pb.SeaweedFiler.KeepConnected:input_type -> filer_pb.KeepConnectedRequest
42, // 38: filer_pb.SeaweedFiler.LocateBroker:input_type -> filer_pb.LocateBrokerRequest
44, // 39: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest
46, // 40: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest
2, // 41: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse
4, // 42: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse
13, // 43: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse
15, // 44: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse
17, // 45: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse
19, // 46: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse
21, // 47: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse
23, // 48: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse
27, // 49: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse
30, // 50: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse
32, // 51: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse
34, // 52: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse
36, // 53: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse
38, // 54: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse
38, // 55: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse
41, // 56: filer_pb.SeaweedFiler.KeepConnected:output_type -> filer_pb.KeepConnectedResponse
43, // 57: filer_pb.SeaweedFiler.LocateBroker:output_type -> filer_pb.LocateBrokerResponse
45, // 58: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse
47, // 59: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse
41, // [41:60] is the sub-list for method output_type
22, // [22:41] is the sub-list for method input_type
22, // [22:22] is the sub-list for extension type_name
22, // [22:22] is the sub-list for extension extendee
0, // [0:22] is the sub-list for field type_name
} }
func init() { file_filer_proto_init() } func init() { file_filer_proto_init() }
@ -4245,7 +4453,19 @@ func file_filer_proto_init() {
return nil return nil
} }
} }
file_filer_proto_msgTypes[49].Exporter = func(v interface{}, i int) interface{} {
file_filer_proto_msgTypes[47].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FilerConf); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_filer_proto_msgTypes[50].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*LocateBrokerResponse_Resource); i { switch v := v.(*LocateBrokerResponse_Resource); i {
case 0: case 0:
return &v.state return &v.state
@ -4257,19 +4477,32 @@ func file_filer_proto_init() {
return nil return nil
} }
} }
file_filer_proto_msgTypes[51].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FilerConf_PathConf); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
} }
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{ File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_filer_proto_rawDesc, RawDescriptor: file_filer_proto_rawDesc,
NumEnums: 0,
NumMessages: 50,
NumEnums: 1,
NumMessages: 52,
NumExtensions: 0, NumExtensions: 0,
NumServices: 1, NumServices: 1,
}, },
GoTypes: file_filer_proto_goTypes, GoTypes: file_filer_proto_goTypes,
DependencyIndexes: file_filer_proto_depIdxs, DependencyIndexes: file_filer_proto_depIdxs,
EnumInfos: file_filer_proto_enumTypes,
MessageInfos: file_filer_proto_msgTypes, MessageInfos: file_filer_proto_msgTypes,
}.Build() }.Build()
File_filer_proto = out.File File_filer_proto = out.File

6
weed/pb/master.proto

@ -130,6 +130,7 @@ message VolumeLocation {
repeated uint32 new_vids = 3; repeated uint32 new_vids = 3;
repeated uint32 deleted_vids = 4; repeated uint32 deleted_vids = 4;
string leader = 5; // optional when leader is not itself string leader = 5; // optional when leader is not itself
string data_center = 6; // optional when DataCenter is in use
} }
message LookupVolumeRequest { message LookupVolumeRequest {
@ -187,11 +188,6 @@ message StatisticsResponse {
// //
// collection related // collection related
// //
message StorageType {
string replication = 1;
string ttl = 2;
}
message Collection { message Collection {
string name = 1; string name = 1;
} }

1001
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File

44
weed/s3api/auth_credentials.go

@ -3,10 +3,11 @@ package s3api
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/jsonpb"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -127,8 +128,14 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
errCode := iam.authRequest(r, action)
identity, errCode := iam.authRequest(r, action)
if errCode == s3err.ErrNone { if errCode == s3err.ErrNone {
if identity != nil && identity.Name != "" {
r.Header.Set(xhttp.AmzIdentityId, identity.Name)
if identity.isAdmin() {
r.Header.Set(xhttp.AmzIsAdmin, "true")
}
}
f(w, r) f(w, r)
return return
} }
@ -137,16 +144,16 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt
} }
// check whether the request has valid access keys // check whether the request has valid access keys
func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) s3err.ErrorCode {
func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) (*Identity, s3err.ErrorCode) {
var identity *Identity var identity *Identity
var s3Err s3err.ErrorCode var s3Err s3err.ErrorCode
var found bool var found bool
switch getRequestAuthType(r) { switch getRequestAuthType(r) {
case authTypeStreamingSigned: case authTypeStreamingSigned:
return s3err.ErrNone
return identity, s3err.ErrNone
case authTypeUnknown: case authTypeUnknown:
glog.V(3).Infof("unknown auth type") glog.V(3).Infof("unknown auth type")
return s3err.ErrAccessDenied
return identity, s3err.ErrAccessDenied
case authTypePresignedV2, authTypeSignedV2: case authTypePresignedV2, authTypeSignedV2:
glog.V(3).Infof("v2 auth type") glog.V(3).Infof("v2 auth type")
identity, s3Err = iam.isReqAuthenticatedV2(r) identity, s3Err = iam.isReqAuthenticatedV2(r)
@ -155,22 +162,22 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
identity, s3Err = iam.reqSignatureV4Verify(r) identity, s3Err = iam.reqSignatureV4Verify(r)
case authTypePostPolicy: case authTypePostPolicy:
glog.V(3).Infof("post policy auth type") glog.V(3).Infof("post policy auth type")
return s3err.ErrNone
return identity, s3err.ErrNone
case authTypeJWT: case authTypeJWT:
glog.V(3).Infof("jwt auth type") glog.V(3).Infof("jwt auth type")
return s3err.ErrNotImplemented
return identity, s3err.ErrNotImplemented
case authTypeAnonymous: case authTypeAnonymous:
identity, found = iam.lookupAnonymous() identity, found = iam.lookupAnonymous()
if !found { if !found {
return s3err.ErrAccessDenied
return identity, s3err.ErrAccessDenied
} }
default: default:
return s3err.ErrNotImplemented
return identity, s3err.ErrNotImplemented
} }
glog.V(3).Infof("auth error: %v", s3Err) glog.V(3).Infof("auth error: %v", s3Err)
if s3Err != s3err.ErrNone { if s3Err != s3err.ErrNone {
return s3Err
return identity, s3Err
} }
glog.V(3).Infof("user name: %v actions: %v", identity.Name, identity.Actions) glog.V(3).Infof("user name: %v actions: %v", identity.Name, identity.Actions)
@ -178,19 +185,17 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
if !identity.canDo(action, bucket) { if !identity.canDo(action, bucket) {
return s3err.ErrAccessDenied
return identity, s3err.ErrAccessDenied
} }
return s3err.ErrNone
return identity, s3err.ErrNone
} }
func (identity *Identity) canDo(action Action, bucket string) bool { func (identity *Identity) canDo(action Action, bucket string) bool {
for _, a := range identity.Actions {
if a == "Admin" {
if identity.isAdmin() {
return true return true
} }
}
for _, a := range identity.Actions { for _, a := range identity.Actions {
if a == action { if a == action {
return true return true
@ -207,3 +212,12 @@ func (identity *Identity) canDo(action Action, bucket string) bool {
} }
return false return false
} }
func (identity *Identity) isAdmin() bool {
for _, a := range identity.Actions {
if a == "Admin" {
return true
}
}
return false
}

6
weed/s3api/filer_util.go

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error { func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error {
@ -75,6 +76,11 @@ func (s3a *S3ApiServer) exists(parentDirectoryPath string, entryName string, isD
} }
func (s3a *S3ApiServer) getEntry(parentDirectoryPath, entryName string) (entry *filer_pb.Entry, err error) {
fullPath := util.NewFullPath(parentDirectoryPath, entryName)
return filer_pb.GetEntry(s3a, fullPath)
}
func objectKey(key *string) *string { func objectKey(key *string) *string {
if strings.HasPrefix(*key, "/") { if strings.HasPrefix(*key, "/") {
t := (*key)[1:] t := (*key)[1:]

6
weed/s3api/http/header.go

@ -28,3 +28,9 @@ const (
AmzObjectTagging = "X-Amz-Tagging" AmzObjectTagging = "X-Amz-Tagging"
AmzTagCount = "x-amz-tagging-count" AmzTagCount = "x-amz-tagging-count"
) )
// Non-Standard S3 HTTP request constants
const (
AmzIdentityId = "x-amz-identity-id"
AmzIsAdmin = "x-amz-is-admin" // only set to http request header as a context
)

80
weed/s3api/s3api_bucket_handlers.go

@ -4,11 +4,13 @@ import (
"context" "context"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"math" "math"
"net/http" "net/http"
"time" "time"
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
@ -33,9 +35,14 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
return return
} }
identityId := r.Header.Get(xhttp.AmzIdentityId)
var buckets []*s3.Bucket var buckets []*s3.Bucket
for _, entry := range entries { for _, entry := range entries {
if entry.IsDirectory { if entry.IsDirectory {
if !s3a.hasAccess(r, entry) {
continue
}
buckets = append(buckets, &s3.Bucket{ buckets = append(buckets, &s3.Bucket{
Name: aws.String(entry.Name), Name: aws.String(entry.Name),
CreationDate: aws.Time(time.Unix(entry.Attributes.Crtime, 0).UTC()), CreationDate: aws.Time(time.Unix(entry.Attributes.Crtime, 0).UTC()),
@ -45,8 +52,8 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
response = ListAllMyBucketsResult{ response = ListAllMyBucketsResult{
Owner: &s3.Owner{ Owner: &s3.Owner{
ID: aws.String(""),
DisplayName: aws.String(""),
ID: aws.String(identityId),
DisplayName: aws.String(identityId),
}, },
Buckets: buckets, Buckets: buckets,
} }
@ -80,13 +87,25 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
writeErrorResponse(w, s3err.ErrInternalError, r.URL) writeErrorResponse(w, s3err.ErrInternalError, r.URL)
return return
} }
if exist, err := s3a.exists(s3a.option.BucketsPath, bucket, true); err == nil && exist {
errCode = s3err.ErrBucketAlreadyExists
}
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
writeErrorResponse(w, errCode, r.URL) writeErrorResponse(w, errCode, r.URL)
return return
} }
fn := func(entry *filer_pb.Entry) {
if identityId := r.Header.Get(xhttp.AmzIdentityId); identityId != "" {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[xhttp.AmzIdentityId] = []byte(identityId)
}
}
// create the folder for bucket, but lazily create actual collection // create the folder for bucket, but lazily create actual collection
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, nil); err != nil {
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, fn); err != nil {
glog.Errorf("PutBucketHandler mkdir: %v", err) glog.Errorf("PutBucketHandler mkdir: %v", err)
writeErrorResponse(w, s3err.ErrInternalError, r.URL) writeErrorResponse(w, s3err.ErrInternalError, r.URL)
return return
@ -99,7 +118,18 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if entry == nil || err == filer_pb.ErrNotFound {
writeErrorResponse(w, s3err.ErrNoSuchBucket, r.URL)
return
}
if !s3a.hasAccess(r, entry) {
writeErrorResponse(w, s3err.ErrAccessDenied, r.URL)
return
}
err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// delete collection // delete collection
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{ deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
@ -128,28 +158,34 @@ func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: s3a.option.BucketsPath,
Name: bucket,
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if entry == nil || err == filer_pb.ErrNotFound {
writeErrorResponse(w, s3err.ErrNoSuchBucket, r.URL)
return
} }
glog.V(1).Infof("lookup bucket: %v", request)
if _, err := filer_pb.LookupEntry(client, request); err != nil {
if err == filer_pb.ErrNotFound {
return filer_pb.ErrNotFound
}
return fmt.Errorf("lookup bucket %s/%s: %v", s3a.option.BucketsPath, bucket, err)
if !s3a.hasAccess(r, entry) {
writeErrorResponse(w, s3err.ErrAccessDenied, r.URL)
return
} }
return nil
})
writeSuccessResponseEmpty(w)
}
if err != nil {
writeErrorResponse(w, s3err.ErrNoSuchBucket, r.URL)
return
func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool {
isAdmin := r.Header.Get(xhttp.AmzIsAdmin) != ""
if isAdmin {
return true
}
if entry.Extended == nil {
return true
} }
writeSuccessResponseEmpty(w)
identityId := r.Header.Get(xhttp.AmzIdentityId)
if id, ok := entry.Extended[xhttp.AmzIdentityId]; ok {
if identityId != string(id) {
return false
}
}
return true
} }

3
weed/s3api/s3api_object_copy_handlers.go

@ -2,6 +2,7 @@ package s3api
import ( import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err" "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"net/http" "net/http"
"net/url" "net/url"
@ -47,6 +48,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
} }
defer util.CloseResponse(resp) defer util.CloseResponse(resp)
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body) etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
@ -127,6 +129,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
} }
defer dataReader.Close() defer dataReader.Close()
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
etag, errCode := s3a.putToFiler(r, dstUrl, dataReader) etag, errCode := s3a.putToFiler(r, dstUrl, dataReader)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {

2
weed/s3api/s3api_object_handlers.go

@ -323,7 +323,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
resp_body, ra_err := ioutil.ReadAll(resp.Body) resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil { if ra_err != nil {
glog.Errorf("upload to filer response read: %v", ra_err)
glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
return etag, s3err.ErrInternalError return etag, s3err.ErrInternalError
} }
var ret weed_server.FilerPostResult var ret weed_server.FilerPostResult

16
weed/s3api/s3api_object_multipart_handlers.go

@ -2,6 +2,7 @@ package s3api
import ( import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err" "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"net/http" "net/http"
"net/url" "net/url"
@ -28,13 +29,13 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
Key: objectKey(aws.String(object)), Key: objectKey(aws.String(object)),
}) })
glog.V(2).Info("NewMultipartUploadHandler", string(encodeResponse(response)), errCode)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
writeErrorResponse(w, errCode, r.URL) writeErrorResponse(w, errCode, r.URL)
return return
} }
// println("NewMultipartUploadHandler", string(encodeResponse(response)))
writeSuccessResponseXML(w, encodeResponse(response)) writeSuccessResponseXML(w, encodeResponse(response))
} }
@ -52,7 +53,7 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
UploadId: aws.String(uploadID), UploadId: aws.String(uploadID),
}) })
// println("CompleteMultipartUploadHandler", string(encodeResponse(response)), errCode)
glog.V(2).Info("CompleteMultipartUploadHandler", string(encodeResponse(response)), errCode)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
writeErrorResponse(w, errCode, r.URL) writeErrorResponse(w, errCode, r.URL)
@ -81,7 +82,7 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
return return
} }
// println("AbortMultipartUploadHandler", string(encodeResponse(response)))
glog.V(2).Info("AbortMultipartUploadHandler", string(encodeResponse(response)))
writeSuccessResponseXML(w, encodeResponse(response)) writeSuccessResponseXML(w, encodeResponse(response))
@ -114,13 +115,14 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
UploadIdMarker: aws.String(uploadIDMarker), UploadIdMarker: aws.String(uploadIDMarker),
}) })
glog.V(2).Info("ListMultipartUploadsHandler", string(encodeResponse(response)), errCode)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
writeErrorResponse(w, errCode, r.URL) writeErrorResponse(w, errCode, r.URL)
return return
} }
// TODO handle encodingType // TODO handle encodingType
// println("ListMultipartUploadsHandler", string(encodeResponse(response)))
writeSuccessResponseXML(w, encodeResponse(response)) writeSuccessResponseXML(w, encodeResponse(response))
} }
@ -147,13 +149,13 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
UploadId: aws.String(uploadID), UploadId: aws.String(uploadID),
}) })
glog.V(2).Info("ListObjectPartsHandler", string(encodeResponse(response)), errCode)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
writeErrorResponse(w, errCode, r.URL) writeErrorResponse(w, errCode, r.URL)
return return
} }
// println("ListObjectPartsHandler", string(encodeResponse(response)))
writeSuccessResponseXML(w, encodeResponse(response)) writeSuccessResponseXML(w, encodeResponse(response))
} }

2
weed/server/filer_server.go

@ -89,7 +89,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!") glog.Fatal("master list is required!")
} }
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast() fs.listenersCond.Broadcast()
}) })
fs.filer.Cipher = option.Cipher fs.filer.Cipher = option.Cipher

2
weed/server/master_grpc_server.go

@ -88,6 +88,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
message := &master_pb.VolumeLocation{ message := &master_pb.VolumeLocation{
Url: dn.Url(), Url: dn.Url(),
PublicUrl: dn.PublicUrl, PublicUrl: dn.PublicUrl,
DataCenter: string(dn.GetDataCenter().Id()),
} }
if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 { if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
// process delta volume ids if exists for fast volume id updates // process delta volume ids if exists for fast volume id updates
@ -148,7 +149,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} }
} }
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock() ms.clientChansLock.RLock()
for host, ch := range ms.clientChans { for host, ch := range ms.clientChans {

2
weed/server/master_server.go

@ -93,7 +93,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
preallocateSize: preallocateSize, preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation), clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
adminLocks: NewAdminLocks(), adminLocks: NewAdminLocks(),
} }
ms.bounedLeaderChan = make(chan int, 16) ms.bounedLeaderChan = make(chan int, 16)

2
weed/shell/commands.go

@ -45,7 +45,7 @@ var (
func NewCommandEnv(options ShellOptions) *CommandEnv { func NewCommandEnv(options ShellOptions) *CommandEnv {
ce := &CommandEnv{ ce := &CommandEnv{
env: make(map[string]string), env: make(map[string]string),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, "", strings.Split(*options.Masters, ",")),
option: options, option: options,
} }
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient) ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)

1
weed/topology/store_replicate.go

@ -81,6 +81,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume
} }
// volume server do not know about encryption // volume server do not know about encryption
// TODO optimize here to compress data only once
_, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsCompressed(), string(n.Mime), pairMap, jwt) _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsCompressed(), string(n.Mime), pairMap, jwt)
return err return err
}); err != nil { }); err != nil {

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
) )
var ( var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 8)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 10)
COMMIT = "" COMMIT = ""
) )

1
weed/util/http_util.go

@ -370,7 +370,6 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer CloseResponse(r)
if r.StatusCode >= 400 { if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
} }

7
weed/wdclient/masterclient.go

@ -24,14 +24,14 @@ type MasterClient struct {
vidMap vidMap
} }
func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, masters []string) *MasterClient {
func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, clientDataCenter string, masters []string) *MasterClient {
return &MasterClient{ return &MasterClient{
clientType: clientType, clientType: clientType,
clientHost: clientHost, clientHost: clientHost,
grpcPort: clientGrpcPort, grpcPort: clientGrpcPort,
masters: masters, masters: masters,
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
vidMap: newVidMap(),
vidMap: newVidMap(clientDataCenter),
} }
} }
@ -89,7 +89,7 @@ func (mc *MasterClient) tryAllMasters() {
} }
mc.currentMaster = "" mc.currentMaster = ""
mc.vidMap = newVidMap()
mc.vidMap = newVidMap("")
} }
} }
@ -132,6 +132,7 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
loc := Location{ loc := Location{
Url: volumeLocation.Url, Url: volumeLocation.Url,
PublicUrl: volumeLocation.PublicUrl, PublicUrl: volumeLocation.PublicUrl,
DataCenter: volumeLocation.DataCenter,
} }
for _, newVid := range volumeLocation.NewVids { for _, newVid := range volumeLocation.NewVids {
glog.V(1).Infof("%s: %s masterClient adds volume %d", mc.clientType, loc.Url, newVid) glog.V(1).Infof("%s: %s masterClient adds volume %d", mc.clientType, loc.Url, newVid)

10
weed/wdclient/vid_map.go

@ -18,18 +18,20 @@ const (
type Location struct { type Location struct {
Url string `json:"url,omitempty"` Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"` PublicUrl string `json:"publicUrl,omitempty"`
DataCenter string `json:"dataCenter,omitempty"`
} }
type vidMap struct { type vidMap struct {
sync.RWMutex sync.RWMutex
vid2Locations map[uint32][]Location vid2Locations map[uint32][]Location
DataCenter string
cursor int32 cursor int32
} }
func newVidMap() vidMap {
func newVidMap(dataCenter string) vidMap {
return vidMap{ return vidMap{
vid2Locations: make(map[uint32][]Location), vid2Locations: make(map[uint32][]Location),
DataCenter: dataCenter,
cursor: -1, cursor: -1,
} }
} }
@ -56,7 +58,11 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return nil, fmt.Errorf("volume %d not found", id) return nil, fmt.Errorf("volume %d not found", id)
} }
for _, loc := range locations { for _, loc := range locations {
if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter {
serverUrls = append(serverUrls, loc.Url) serverUrls = append(serverUrls, loc.Url)
} else {
serverUrls = append([]string{loc.Url}, serverUrls...)
}
} }
return return
} }

2
weed/wdclient/vid_map_test.go

@ -45,7 +45,7 @@ func TestLocationIndex(t *testing.T) {
mustOk(7, maxCursorIndex, 0) mustOk(7, maxCursorIndex, 0)
// test with constructor // test with constructor
vm = newVidMap()
vm = newVidMap("")
length := 7 length := 7
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
got, err := vm.getLocationIndex(length) got, err := vm.getLocationIndex(length)

Loading…
Cancel
Save