hilimd
4 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
132 changed files with 2991 additions and 1000 deletions
-
0.github/ISSUE_TEMPLATE.md
-
8.github/workflows/release.yml
-
1.travis.yml
-
6README.md
-
3docker/Dockerfile
-
4docker/Makefile
-
3docker/compose/local-dev-compose.yml
-
2docker/compose/local-mount-compose.yml
-
9go.mod
-
14go.sum
-
4k8s/seaweedfs/Chart.yaml
-
2k8s/seaweedfs/templates/volume-service.yaml
-
2k8s/seaweedfs/values.yaml
-
52other/java/client/src/main/java/seaweedfs/client/FilerClient.java
-
5other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
-
11other/java/client/src/test/java/seaweedfs/client/SeaweedFilerTest.java
-
22other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile2.java
-
2unmaintained/repeated_vacuum/repeated_vacuum.go
-
2weed/Makefile
-
2weed/command/command.go
-
2weed/command/export.go
-
27weed/command/filer.go
-
2weed/command/filer_cat.go
-
113weed/command/filer_copy.go
-
93weed/command/gateway.go
-
97weed/command/iam.go
-
7weed/command/mount_std.go
-
2weed/command/scaffold.go
-
9weed/command/server.go
-
28weed/command/volume.go
-
4weed/filer/abstract_sql/abstract_sql_store.go
-
6weed/filer/filechunks.go
-
2weed/filer/filer_buckets.go
-
1weed/filer/filer_conf.go
-
2weed/filer/filer_delete_entry.go
-
4weed/filer/filer_notify.go
-
2weed/filer/filer_on_meta_event.go
-
34weed/filer/filer_search.go
-
2weed/filer/filerstore_wrapper.go
-
6weed/filer/leveldb/leveldb_store_test.go
-
6weed/filer/leveldb2/leveldb2_store_test.go
-
6weed/filer/leveldb3/leveldb3_store_test.go
-
10weed/filer/read_write.go
-
22weed/filer/reader_at.go
-
5weed/filer/reader_at_test.go
-
6weed/filer/rocksdb/rocksdb_store_test.go
-
26weed/filer/stream.go
-
305weed/filesys/dir.go
-
21weed/filesys/dir_link.go
-
21weed/filesys/dir_rename.go
-
3weed/filesys/dirty_page.go
-
104weed/filesys/file.go
-
49weed/filesys/filehandle.go
-
4weed/filesys/meta_cache/meta_cache.go
-
8weed/filesys/meta_cache/meta_cache_init.go
-
70weed/filesys/wfs.go
-
59weed/filesys/wfs_write.go
-
2weed/glog/glog.go
-
105weed/iamapi/iamapi_handlers.go
-
453weed/iamapi/iamapi_management_handlers.go
-
103weed/iamapi/iamapi_response.go
-
149weed/iamapi/iamapi_server.go
-
181weed/iamapi/iamapi_test.go
-
45weed/messaging/broker/broker_append.go
-
1weed/operation/assign_file_id.go
-
11weed/operation/upload_content.go
-
1weed/pb/master.proto
-
206weed/pb/master_pb/master.pb.go
-
45weed/replication/sink/filersink/fetch_write.go
-
28weed/s3api/auth_credentials.go
-
30weed/s3api/auth_signature_v4.go
-
2weed/s3api/auto_signature_v4_test.go
-
4weed/s3api/chunked_reader_v4.go
-
8weed/s3api/s3api_object_handlers.go
-
15weed/s3api/s3api_objects_list_handlers.go
-
8weed/server/common.go
-
2weed/server/filer_grpc_server.go
-
2weed/server/filer_grpc_server_rename.go
-
12weed/server/filer_server_handlers_read.go
-
3weed/server/filer_server_handlers_read_dir.go
-
20weed/server/filer_server_handlers_tagging.go
-
8weed/server/filer_server_handlers_write_autochunk.go
-
169weed/server/filer_server_handlers_write_upload.go
-
2weed/server/filer_ui/breadcrumb.go
-
2weed/server/filer_ui/templates.go
-
106weed/server/gateway_server.go
-
21weed/server/master_grpc_server_admin.go
-
102weed/server/master_grpc_server_volume.go
-
22weed/server/master_server.go
-
20weed/server/master_server_handlers.go
-
10weed/server/master_server_handlers_admin.go
-
12weed/server/master_server_handlers_ui.go
-
10weed/server/master_ui/templates.go
-
9weed/server/volume_grpc_vacuum.go
-
4weed/server/volume_server.go
-
7weed/server/volume_server_handlers_read.go
-
3weed/server/volume_server_handlers_write.go
-
2weed/server/volume_server_ui/templates.go
-
41weed/server/webdav_server.go
-
47weed/shell/command_ec_encode.go
@ -1,5 +1,5 @@ |
|||||
apiVersion: v1 |
apiVersion: v1 |
||||
description: SeaweedFS |
description: SeaweedFS |
||||
name: seaweedfs |
name: seaweedfs |
||||
appVersion: "2.38" |
|
||||
version: 2.38 |
|
||||
|
appVersion: "2.43" |
||||
|
version: 2.43 |
@ -0,0 +1,22 @@ |
|||||
|
package com.seaweedfs.examples; |
||||
|
|
||||
|
import com.google.common.io.Files; |
||||
|
import seaweedfs.client.FilerClient; |
||||
|
import seaweedfs.client.SeaweedOutputStream; |
||||
|
|
||||
|
import java.io.File; |
||||
|
import java.io.IOException; |
||||
|
|
||||
|
public class ExampleWriteFile2 { |
||||
|
|
||||
|
public static void main(String[] args) throws IOException { |
||||
|
|
||||
|
FilerClient filerClient = new FilerClient("localhost", 18888); |
||||
|
|
||||
|
SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerClient, "/test/1"); |
||||
|
Files.copy(new File("/etc/resolv.conf"), seaweedOutputStream); |
||||
|
seaweedOutputStream.close(); |
||||
|
|
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,93 @@ |
|||||
|
package command |
||||
|
|
||||
|
import ( |
||||
|
"net/http" |
||||
|
"strconv" |
||||
|
"strings" |
||||
|
"time" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/server" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
gatewayOptions GatewayOptions |
||||
|
) |
||||
|
|
||||
|
type GatewayOptions struct { |
||||
|
masters *string |
||||
|
filers *string |
||||
|
bindIp *string |
||||
|
port *int |
||||
|
maxMB *int |
||||
|
} |
||||
|
|
||||
|
func init() { |
||||
|
cmdGateway.Run = runGateway // break init cycle
|
||||
|
gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers") |
||||
|
gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers") |
||||
|
gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to") |
||||
|
gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port") |
||||
|
gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit") |
||||
|
} |
||||
|
|
||||
|
var cmdGateway = &Command{ |
||||
|
UsageLine: "gateway -port=8888 -master=<ip:port>[,<ip:port>]* -filer=<ip:port>[,<ip:port>]*", |
||||
|
Short: "start a gateway server that points to a list of master servers or a list of filers", |
||||
|
Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages. |
||||
|
|
||||
|
POST /blobs/ |
||||
|
upload the blob and return a chunk id |
||||
|
DELETE /blobs/<chunk_id> |
||||
|
delete a chunk id |
||||
|
|
||||
|
/* |
||||
|
POST /files/path/to/a/file |
||||
|
save /path/to/a/file on filer |
||||
|
DELETE /files/path/to/a/file |
||||
|
delete /path/to/a/file on filer |
||||
|
|
||||
|
POST /topics/topicName |
||||
|
save on filer to /topics/topicName/<ds>/ts.json |
||||
|
*/ |
||||
|
`, |
||||
|
} |
||||
|
|
||||
|
func runGateway(cmd *Command, args []string) bool { |
||||
|
|
||||
|
util.LoadConfiguration("security", false) |
||||
|
|
||||
|
gatewayOptions.startGateway() |
||||
|
|
||||
|
return true |
||||
|
} |
||||
|
|
||||
|
func (gw *GatewayOptions) startGateway() { |
||||
|
|
||||
|
defaultMux := http.NewServeMux() |
||||
|
|
||||
|
_, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{ |
||||
|
Masters: strings.Split(*gw.masters, ","), |
||||
|
Filers: strings.Split(*gw.filers, ","), |
||||
|
MaxMB: *gw.maxMB, |
||||
|
}) |
||||
|
if gws_err != nil { |
||||
|
glog.Fatalf("Gateway startup error: %v", gws_err) |
||||
|
} |
||||
|
|
||||
|
glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port) |
||||
|
gatewayListener, e := util.NewListener( |
||||
|
*gw.bindIp+":"+strconv.Itoa(*gw.port), |
||||
|
time.Duration(10)*time.Second, |
||||
|
) |
||||
|
if e != nil { |
||||
|
glog.Fatalf("Filer listener error: %v", e) |
||||
|
} |
||||
|
|
||||
|
httpS := &http.Server{Handler: defaultMux} |
||||
|
if err := httpS.Serve(gatewayListener); err != nil { |
||||
|
glog.Fatalf("Gateway Fail to serve: %v", e) |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,97 @@ |
|||||
|
package command |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"fmt" |
||||
|
"net/http" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/iamapi" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/security" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
"github.com/gorilla/mux" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
iamStandaloneOptions IamOptions |
||||
|
) |
||||
|
|
||||
|
type IamOptions struct { |
||||
|
filer *string |
||||
|
masters *string |
||||
|
port *int |
||||
|
} |
||||
|
|
||||
|
func init() { |
||||
|
cmdIam.Run = runIam // break init cycle
|
||||
|
iamStandaloneOptions.filer = cmdIam.Flag.String("filer", "localhost:8888", "filer server address") |
||||
|
iamStandaloneOptions.masters = cmdIam.Flag.String("master", "localhost:9333", "comma-separated master servers") |
||||
|
iamStandaloneOptions.port = cmdIam.Flag.Int("port", 8111, "iam server http listen port") |
||||
|
} |
||||
|
|
||||
|
var cmdIam = &Command{ |
||||
|
UsageLine: "iam [-port=8111] [-filer=<ip:port>] [-masters=<ip:port>,<ip:port>]", |
||||
|
Short: "start a iam API compatible server", |
||||
|
Long: "start a iam API compatible server.", |
||||
|
} |
||||
|
|
||||
|
func runIam(cmd *Command, args []string) bool { |
||||
|
return iamStandaloneOptions.startIamServer() |
||||
|
} |
||||
|
|
||||
|
func (iamopt *IamOptions) startIamServer() bool { |
||||
|
filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*iamopt.filer) |
||||
|
if err != nil { |
||||
|
glog.Fatal(err) |
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") |
||||
|
for { |
||||
|
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
||||
|
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) |
||||
|
} |
||||
|
glog.V(0).Infof("IAM read filer configuration: %s", resp) |
||||
|
return nil |
||||
|
}) |
||||
|
if err != nil { |
||||
|
glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress) |
||||
|
time.Sleep(time.Second) |
||||
|
} else { |
||||
|
glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress) |
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
router := mux.NewRouter().SkipClean(true) |
||||
|
_, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ |
||||
|
Filer: *iamopt.filer, |
||||
|
Port: *iamopt.port, |
||||
|
FilerGrpcAddress: filerGrpcAddress, |
||||
|
GrpcDialOption: grpcDialOption, |
||||
|
}) |
||||
|
glog.V(0).Info("NewIamApiServer created") |
||||
|
if iamApiServer_err != nil { |
||||
|
glog.Fatalf("IAM API Server startup error: %v", iamApiServer_err) |
||||
|
} |
||||
|
|
||||
|
httpS := &http.Server{Handler: router} |
||||
|
|
||||
|
listenAddress := fmt.Sprintf(":%d", *iamopt.port) |
||||
|
iamApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) |
||||
|
if err != nil { |
||||
|
glog.Fatalf("IAM API Server listener on %s error: %v", listenAddress, err) |
||||
|
} |
||||
|
|
||||
|
glog.V(0).Infof("Start Seaweed IAM API Server %s at http port %d", util.Version(), *iamopt.port) |
||||
|
if err = httpS.Serve(iamApiListener); err != nil { |
||||
|
glog.Fatalf("IAM API Server Fail to serve: %v", err) |
||||
|
} |
||||
|
|
||||
|
return true |
||||
|
} |
@ -0,0 +1,105 @@ |
|||||
|
package iamapi |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"encoding/xml" |
||||
|
"fmt" |
||||
|
"strconv" |
||||
|
|
||||
|
"net/http" |
||||
|
"net/url" |
||||
|
"time" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/s3api/s3err" |
||||
|
|
||||
|
"github.com/aws/aws-sdk-go/service/iam" |
||||
|
) |
||||
|
|
||||
|
type mimeType string |
||||
|
|
||||
|
const ( |
||||
|
mimeNone mimeType = "" |
||||
|
mimeXML mimeType = "application/xml" |
||||
|
) |
||||
|
|
||||
|
func setCommonHeaders(w http.ResponseWriter) { |
||||
|
w.Header().Set("x-amz-request-id", fmt.Sprintf("%d", time.Now().UnixNano())) |
||||
|
w.Header().Set("Accept-Ranges", "bytes") |
||||
|
} |
||||
|
|
||||
|
// Encodes the response headers into XML format.
|
||||
|
func encodeResponse(response interface{}) []byte { |
||||
|
var bytesBuffer bytes.Buffer |
||||
|
bytesBuffer.WriteString(xml.Header) |
||||
|
e := xml.NewEncoder(&bytesBuffer) |
||||
|
e.Encode(response) |
||||
|
return bytesBuffer.Bytes() |
||||
|
} |
||||
|
|
||||
|
// If none of the http routes match respond with MethodNotAllowed
|
||||
|
func notFoundHandler(w http.ResponseWriter, r *http.Request) { |
||||
|
glog.V(0).Infof("unsupported %s %s", r.Method, r.RequestURI) |
||||
|
writeErrorResponse(w, s3err.ErrMethodNotAllowed, r.URL) |
||||
|
} |
||||
|
|
||||
|
func writeErrorResponse(w http.ResponseWriter, errorCode s3err.ErrorCode, reqURL *url.URL) { |
||||
|
apiError := s3err.GetAPIError(errorCode) |
||||
|
errorResponse := getRESTErrorResponse(apiError, reqURL.Path) |
||||
|
encodedErrorResponse := encodeResponse(errorResponse) |
||||
|
writeResponse(w, apiError.HTTPStatusCode, encodedErrorResponse, mimeXML) |
||||
|
} |
||||
|
|
||||
|
func writeIamErrorResponse(w http.ResponseWriter, err error, object string, value string, msg error) { |
||||
|
errCode := err.Error() |
||||
|
errorResp := ErrorResponse{} |
||||
|
errorResp.Error.Type = "Sender" |
||||
|
errorResp.Error.Code = &errCode |
||||
|
if msg != nil { |
||||
|
errMsg := msg.Error() |
||||
|
errorResp.Error.Message = &errMsg |
||||
|
} |
||||
|
glog.Errorf("Response %+v", err) |
||||
|
switch errCode { |
||||
|
case iam.ErrCodeNoSuchEntityException: |
||||
|
msg := fmt.Sprintf("The %s with name %s cannot be found.", object, value) |
||||
|
errorResp.Error.Message = &msg |
||||
|
writeResponse(w, http.StatusNotFound, encodeResponse(errorResp), mimeXML) |
||||
|
case iam.ErrCodeServiceFailureException: |
||||
|
writeResponse(w, http.StatusInternalServerError, encodeResponse(errorResp), mimeXML) |
||||
|
default: |
||||
|
writeResponse(w, http.StatusInternalServerError, encodeResponse(errorResp), mimeXML) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func getRESTErrorResponse(err s3err.APIError, resource string) s3err.RESTErrorResponse { |
||||
|
return s3err.RESTErrorResponse{ |
||||
|
Code: err.Code, |
||||
|
Message: err.Description, |
||||
|
Resource: resource, |
||||
|
RequestID: fmt.Sprintf("%d", time.Now().UnixNano()), |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func writeResponse(w http.ResponseWriter, statusCode int, response []byte, mType mimeType) { |
||||
|
setCommonHeaders(w) |
||||
|
if response != nil { |
||||
|
w.Header().Set("Content-Length", strconv.Itoa(len(response))) |
||||
|
} |
||||
|
if mType != mimeNone { |
||||
|
w.Header().Set("Content-Type", string(mType)) |
||||
|
} |
||||
|
w.WriteHeader(statusCode) |
||||
|
if response != nil { |
||||
|
glog.V(4).Infof("status %d %s: %s", statusCode, mType, string(response)) |
||||
|
_, err := w.Write(response) |
||||
|
if err != nil { |
||||
|
glog.V(0).Infof("write err: %v", err) |
||||
|
} |
||||
|
w.(http.Flusher).Flush() |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func writeSuccessResponseXML(w http.ResponseWriter, response []byte) { |
||||
|
writeResponse(w, http.StatusOK, response, mimeXML) |
||||
|
} |
@ -0,0 +1,453 @@ |
|||||
|
package iamapi |
||||
|
|
||||
|
import ( |
||||
|
"crypto/sha1" |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" |
||||
|
"github.com/chrislusf/seaweedfs/weed/s3api/s3err" |
||||
|
"math/rand" |
||||
|
"net/http" |
||||
|
"net/url" |
||||
|
"reflect" |
||||
|
"strings" |
||||
|
"sync" |
||||
|
"time" |
||||
|
|
||||
|
"github.com/aws/aws-sdk-go/service/iam" |
||||
|
) |
||||
|
|
||||
|
const ( |
||||
|
charsetUpper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" |
||||
|
charset = charsetUpper + "abcdefghijklmnopqrstuvwxyz/" |
||||
|
policyDocumentVersion = "2012-10-17" |
||||
|
StatementActionAdmin = "*" |
||||
|
StatementActionWrite = "Put*" |
||||
|
StatementActionRead = "Get*" |
||||
|
StatementActionList = "List*" |
||||
|
StatementActionTagging = "Tagging*" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
seededRand *rand.Rand = rand.New( |
||||
|
rand.NewSource(time.Now().UnixNano())) |
||||
|
policyDocuments = map[string]*PolicyDocument{} |
||||
|
policyLock = sync.RWMutex{} |
||||
|
) |
||||
|
|
||||
|
func MapToStatementAction(action string) string { |
||||
|
switch action { |
||||
|
case StatementActionAdmin: |
||||
|
return s3_constants.ACTION_ADMIN |
||||
|
case StatementActionWrite: |
||||
|
return s3_constants.ACTION_WRITE |
||||
|
case StatementActionRead: |
||||
|
return s3_constants.ACTION_READ |
||||
|
case StatementActionList: |
||||
|
return s3_constants.ACTION_LIST |
||||
|
case StatementActionTagging: |
||||
|
return s3_constants.ACTION_TAGGING |
||||
|
default: |
||||
|
return "" |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func MapToIdentitiesAction(action string) string { |
||||
|
switch action { |
||||
|
case s3_constants.ACTION_ADMIN: |
||||
|
return StatementActionAdmin |
||||
|
case s3_constants.ACTION_WRITE: |
||||
|
return StatementActionWrite |
||||
|
case s3_constants.ACTION_READ: |
||||
|
return StatementActionRead |
||||
|
case s3_constants.ACTION_LIST: |
||||
|
return StatementActionList |
||||
|
case s3_constants.ACTION_TAGGING: |
||||
|
return StatementActionTagging |
||||
|
default: |
||||
|
return "" |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
type Statement struct { |
||||
|
Effect string `json:"Effect"` |
||||
|
Action []string `json:"Action"` |
||||
|
Resource []string `json:"Resource"` |
||||
|
} |
||||
|
|
||||
|
type Policies struct { |
||||
|
Policies map[string]PolicyDocument `json:"policies"` |
||||
|
} |
||||
|
|
||||
|
type PolicyDocument struct { |
||||
|
Version string `json:"Version"` |
||||
|
Statement []*Statement `json:"Statement"` |
||||
|
} |
||||
|
|
||||
|
func (p PolicyDocument) String() string { |
||||
|
b, _ := json.Marshal(p) |
||||
|
return string(b) |
||||
|
} |
||||
|
|
||||
|
func Hash(s *string) string { |
||||
|
h := sha1.New() |
||||
|
h.Write([]byte(*s)) |
||||
|
return fmt.Sprintf("%x", h.Sum(nil)) |
||||
|
} |
||||
|
|
||||
|
func StringWithCharset(length int, charset string) string { |
||||
|
b := make([]byte, length) |
||||
|
for i := range b { |
||||
|
b[i] = charset[seededRand.Intn(len(charset))] |
||||
|
} |
||||
|
return string(b) |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) ListUsers(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp ListUsersResponse) { |
||||
|
for _, ident := range s3cfg.Identities { |
||||
|
resp.ListUsersResult.Users = append(resp.ListUsersResult.Users, &iam.User{UserName: &ident.Name}) |
||||
|
} |
||||
|
return resp |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) ListAccessKeys(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp ListAccessKeysResponse) { |
||||
|
status := iam.StatusTypeActive |
||||
|
userName := values.Get("UserName") |
||||
|
for _, ident := range s3cfg.Identities { |
||||
|
if userName != "" && userName != ident.Name { |
||||
|
continue |
||||
|
} |
||||
|
for _, cred := range ident.Credentials { |
||||
|
resp.ListAccessKeysResult.AccessKeyMetadata = append(resp.ListAccessKeysResult.AccessKeyMetadata, |
||||
|
&iam.AccessKeyMetadata{UserName: &ident.Name, AccessKeyId: &cred.AccessKey, Status: &status}, |
||||
|
) |
||||
|
} |
||||
|
} |
||||
|
return resp |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) CreateUser(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp CreateUserResponse) { |
||||
|
userName := values.Get("UserName") |
||||
|
resp.CreateUserResult.User.UserName = &userName |
||||
|
s3cfg.Identities = append(s3cfg.Identities, &iam_pb.Identity{Name: userName}) |
||||
|
return resp |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) DeleteUser(s3cfg *iam_pb.S3ApiConfiguration, userName string) (resp DeleteUserResponse, err error) { |
||||
|
for i, ident := range s3cfg.Identities { |
||||
|
if userName == ident.Name { |
||||
|
s3cfg.Identities = append(s3cfg.Identities[:i], s3cfg.Identities[i+1:]...) |
||||
|
return resp, nil |
||||
|
} |
||||
|
} |
||||
|
return resp, fmt.Errorf(iam.ErrCodeNoSuchEntityException) |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) GetUser(s3cfg *iam_pb.S3ApiConfiguration, userName string) (resp GetUserResponse, err error) { |
||||
|
for _, ident := range s3cfg.Identities { |
||||
|
if userName == ident.Name { |
||||
|
resp.GetUserResult.User = iam.User{UserName: &ident.Name} |
||||
|
return resp, nil |
||||
|
} |
||||
|
} |
||||
|
return resp, fmt.Errorf(iam.ErrCodeNoSuchEntityException) |
||||
|
} |
||||
|
|
||||
|
func GetPolicyDocument(policy *string) (policyDocument PolicyDocument, err error) { |
||||
|
if err = json.Unmarshal([]byte(*policy), &policyDocument); err != nil { |
||||
|
return PolicyDocument{}, err |
||||
|
} |
||||
|
return policyDocument, err |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) CreatePolicy(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp CreatePolicyResponse, err error) { |
||||
|
policyName := values.Get("PolicyName") |
||||
|
policyDocumentString := values.Get("PolicyDocument") |
||||
|
policyDocument, err := GetPolicyDocument(&policyDocumentString) |
||||
|
if err != nil { |
||||
|
return CreatePolicyResponse{}, err |
||||
|
} |
||||
|
policyId := Hash(&policyDocumentString) |
||||
|
arn := fmt.Sprintf("arn:aws:iam:::policy/%s", policyName) |
||||
|
resp.CreatePolicyResult.Policy.PolicyName = &policyName |
||||
|
resp.CreatePolicyResult.Policy.Arn = &arn |
||||
|
resp.CreatePolicyResult.Policy.PolicyId = &policyId |
||||
|
policies := Policies{} |
||||
|
policyLock.Lock() |
||||
|
defer policyLock.Unlock() |
||||
|
if err = iama.s3ApiConfig.GetPolicies(&policies); err != nil { |
||||
|
return resp, err |
||||
|
} |
||||
|
policies.Policies[policyName] = policyDocument |
||||
|
if err = iama.s3ApiConfig.PutPolicies(&policies); err != nil { |
||||
|
return resp, err |
||||
|
} |
||||
|
return resp, nil |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) PutUserPolicy(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp PutUserPolicyResponse, err error) { |
||||
|
userName := values.Get("UserName") |
||||
|
policyName := values.Get("PolicyName") |
||||
|
policyDocumentString := values.Get("PolicyDocument") |
||||
|
policyDocument, err := GetPolicyDocument(&policyDocumentString) |
||||
|
if err != nil { |
||||
|
return PutUserPolicyResponse{}, err |
||||
|
} |
||||
|
policyDocuments[policyName] = &policyDocument |
||||
|
actions := GetActions(&policyDocument) |
||||
|
for _, ident := range s3cfg.Identities { |
||||
|
if userName == ident.Name { |
||||
|
for _, action := range actions { |
||||
|
ident.Actions = append(ident.Actions, action) |
||||
|
} |
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
return resp, nil |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) GetUserPolicy(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp GetUserPolicyResponse, err error) { |
||||
|
userName := values.Get("UserName") |
||||
|
policyName := values.Get("PolicyName") |
||||
|
for _, ident := range s3cfg.Identities { |
||||
|
if userName != ident.Name { |
||||
|
continue |
||||
|
} |
||||
|
|
||||
|
resp.GetUserPolicyResult.UserName = userName |
||||
|
resp.GetUserPolicyResult.PolicyName = policyName |
||||
|
if len(ident.Actions) == 0 { |
||||
|
return resp, fmt.Errorf(iam.ErrCodeNoSuchEntityException) |
||||
|
} |
||||
|
|
||||
|
policyDocument := PolicyDocument{Version: policyDocumentVersion} |
||||
|
statements := make(map[string][]string) |
||||
|
for _, action := range ident.Actions { |
||||
|
// parse "Read:EXAMPLE-BUCKET"
|
||||
|
act := strings.Split(action, ":") |
||||
|
|
||||
|
resource := "*" |
||||
|
if len(act) == 2 { |
||||
|
resource = fmt.Sprintf("arn:aws:s3:::%s/*", act[1]) |
||||
|
} |
||||
|
statements[resource] = append(statements[resource], |
||||
|
fmt.Sprintf("s3:%s", MapToIdentitiesAction(act[0])), |
||||
|
) |
||||
|
} |
||||
|
for resource, actions := range statements { |
||||
|
isEqAction := false |
||||
|
for i, statement := range policyDocument.Statement { |
||||
|
if reflect.DeepEqual(statement.Action, actions) { |
||||
|
policyDocument.Statement[i].Resource = append( |
||||
|
policyDocument.Statement[i].Resource, resource) |
||||
|
isEqAction = true |
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
if isEqAction { |
||||
|
continue |
||||
|
} |
||||
|
policyDocumentStatement := Statement{ |
||||
|
Effect: "Allow", |
||||
|
Action: actions, |
||||
|
} |
||||
|
policyDocumentStatement.Resource = append(policyDocumentStatement.Resource, resource) |
||||
|
policyDocument.Statement = append(policyDocument.Statement, &policyDocumentStatement) |
||||
|
} |
||||
|
resp.GetUserPolicyResult.PolicyDocument = policyDocument.String() |
||||
|
return resp, nil |
||||
|
} |
||||
|
return resp, fmt.Errorf(iam.ErrCodeNoSuchEntityException) |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) DeleteUserPolicy(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp PutUserPolicyResponse, err error) { |
||||
|
userName := values.Get("UserName") |
||||
|
for i, ident := range s3cfg.Identities { |
||||
|
if ident.Name == userName { |
||||
|
s3cfg.Identities = append(s3cfg.Identities[:i], s3cfg.Identities[i+1:]...) |
||||
|
return resp, nil |
||||
|
} |
||||
|
} |
||||
|
return resp, fmt.Errorf(iam.ErrCodeNoSuchEntityException) |
||||
|
} |
||||
|
|
||||
|
func GetActions(policy *PolicyDocument) (actions []string) { |
||||
|
for _, statement := range policy.Statement { |
||||
|
if statement.Effect != "Allow" { |
||||
|
continue |
||||
|
} |
||||
|
for _, resource := range statement.Resource { |
||||
|
// Parse "arn:aws:s3:::my-bucket/shared/*"
|
||||
|
res := strings.Split(resource, ":") |
||||
|
if len(res) != 6 || res[0] != "arn" || res[1] != "aws" || res[2] != "s3" { |
||||
|
glog.Infof("not match resource: %s", res) |
||||
|
continue |
||||
|
} |
||||
|
for _, action := range statement.Action { |
||||
|
// Parse "s3:Get*"
|
||||
|
act := strings.Split(action, ":") |
||||
|
if len(act) != 2 || act[0] != "s3" { |
||||
|
glog.Infof("not match action: %s", act) |
||||
|
continue |
||||
|
} |
||||
|
statementAction := MapToStatementAction(act[1]) |
||||
|
if res[5] == "*" { |
||||
|
actions = append(actions, statementAction) |
||||
|
continue |
||||
|
} |
||||
|
// Parse my-bucket/shared/*
|
||||
|
path := strings.Split(res[5], "/") |
||||
|
if len(path) != 2 || path[1] != "*" { |
||||
|
glog.Infof("not match bucket: %s", path) |
||||
|
continue |
||||
|
} |
||||
|
actions = append(actions, fmt.Sprintf("%s:%s", statementAction, path[0])) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
return actions |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) CreateAccessKey(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp CreateAccessKeyResponse) { |
||||
|
userName := values.Get("UserName") |
||||
|
status := iam.StatusTypeActive |
||||
|
accessKeyId := StringWithCharset(21, charsetUpper) |
||||
|
secretAccessKey := StringWithCharset(42, charset) |
||||
|
resp.CreateAccessKeyResult.AccessKey.AccessKeyId = &accessKeyId |
||||
|
resp.CreateAccessKeyResult.AccessKey.SecretAccessKey = &secretAccessKey |
||||
|
resp.CreateAccessKeyResult.AccessKey.UserName = &userName |
||||
|
resp.CreateAccessKeyResult.AccessKey.Status = &status |
||||
|
changed := false |
||||
|
for _, ident := range s3cfg.Identities { |
||||
|
if userName == ident.Name { |
||||
|
ident.Credentials = append(ident.Credentials, |
||||
|
&iam_pb.Credential{AccessKey: accessKeyId, SecretKey: secretAccessKey}) |
||||
|
changed = true |
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
if !changed { |
||||
|
s3cfg.Identities = append(s3cfg.Identities, |
||||
|
&iam_pb.Identity{Name: userName, |
||||
|
Credentials: []*iam_pb.Credential{ |
||||
|
{ |
||||
|
AccessKey: accessKeyId, |
||||
|
SecretKey: secretAccessKey, |
||||
|
}, |
||||
|
}, |
||||
|
}, |
||||
|
) |
||||
|
} |
||||
|
return resp |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) DeleteAccessKey(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp DeleteAccessKeyResponse) { |
||||
|
userName := values.Get("UserName") |
||||
|
accessKeyId := values.Get("AccessKeyId") |
||||
|
for _, ident := range s3cfg.Identities { |
||||
|
if userName == ident.Name { |
||||
|
for i, cred := range ident.Credentials { |
||||
|
if cred.AccessKey == accessKeyId { |
||||
|
ident.Credentials = append(ident.Credentials[:i], ident.Credentials[i+1:]...) |
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
return resp |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) DoActions(w http.ResponseWriter, r *http.Request) { |
||||
|
if err := r.ParseForm(); err != nil { |
||||
|
writeErrorResponse(w, s3err.ErrInvalidRequest, r.URL) |
||||
|
return |
||||
|
} |
||||
|
values := r.PostForm |
||||
|
var s3cfgLock sync.RWMutex |
||||
|
s3cfgLock.RLock() |
||||
|
s3cfg := &iam_pb.S3ApiConfiguration{} |
||||
|
if err := iama.s3ApiConfig.GetS3ApiConfiguration(s3cfg); err != nil { |
||||
|
writeErrorResponse(w, s3err.ErrInternalError, r.URL) |
||||
|
return |
||||
|
} |
||||
|
s3cfgLock.RUnlock() |
||||
|
|
||||
|
glog.V(4).Infof("DoActions: %+v", values) |
||||
|
var response interface{} |
||||
|
var err error |
||||
|
changed := true |
||||
|
switch r.Form.Get("Action") { |
||||
|
case "ListUsers": |
||||
|
response = iama.ListUsers(s3cfg, values) |
||||
|
changed = false |
||||
|
case "ListAccessKeys": |
||||
|
response = iama.ListAccessKeys(s3cfg, values) |
||||
|
changed = false |
||||
|
case "CreateUser": |
||||
|
response = iama.CreateUser(s3cfg, values) |
||||
|
case "GetUser": |
||||
|
userName := values.Get("UserName") |
||||
|
response, err = iama.GetUser(s3cfg, userName) |
||||
|
if err != nil { |
||||
|
writeIamErrorResponse(w, err, "user", userName, nil) |
||||
|
return |
||||
|
} |
||||
|
changed = false |
||||
|
case "DeleteUser": |
||||
|
userName := values.Get("UserName") |
||||
|
response, err = iama.DeleteUser(s3cfg, userName) |
||||
|
if err != nil { |
||||
|
writeIamErrorResponse(w, err, "user", userName, nil) |
||||
|
return |
||||
|
} |
||||
|
case "CreateAccessKey": |
||||
|
response = iama.CreateAccessKey(s3cfg, values) |
||||
|
case "DeleteAccessKey": |
||||
|
response = iama.DeleteAccessKey(s3cfg, values) |
||||
|
case "CreatePolicy": |
||||
|
response, err = iama.CreatePolicy(s3cfg, values) |
||||
|
if err != nil { |
||||
|
glog.Errorf("CreatePolicy: %+v", err) |
||||
|
writeErrorResponse(w, s3err.ErrInvalidRequest, r.URL) |
||||
|
return |
||||
|
} |
||||
|
case "PutUserPolicy": |
||||
|
response, err = iama.PutUserPolicy(s3cfg, values) |
||||
|
if err != nil { |
||||
|
glog.Errorf("PutUserPolicy: %+v", err) |
||||
|
writeErrorResponse(w, s3err.ErrInvalidRequest, r.URL) |
||||
|
return |
||||
|
} |
||||
|
case "GetUserPolicy": |
||||
|
response, err = iama.GetUserPolicy(s3cfg, values) |
||||
|
if err != nil { |
||||
|
writeIamErrorResponse(w, err, "user", values.Get("UserName"), nil) |
||||
|
return |
||||
|
} |
||||
|
changed = false |
||||
|
case "DeleteUserPolicy": |
||||
|
if response, err = iama.DeleteUserPolicy(s3cfg, values); err != nil { |
||||
|
writeIamErrorResponse(w, err, "user", values.Get("UserName"), nil) |
||||
|
} |
||||
|
default: |
||||
|
errNotImplemented := s3err.GetAPIError(s3err.ErrNotImplemented) |
||||
|
errorResponse := ErrorResponse{} |
||||
|
errorResponse.Error.Code = &errNotImplemented.Code |
||||
|
errorResponse.Error.Message = &errNotImplemented.Description |
||||
|
writeResponse(w, errNotImplemented.HTTPStatusCode, encodeResponse(errorResponse), mimeXML) |
||||
|
return |
||||
|
} |
||||
|
if changed { |
||||
|
s3cfgLock.Lock() |
||||
|
err := iama.s3ApiConfig.PutS3ApiConfiguration(s3cfg) |
||||
|
s3cfgLock.Unlock() |
||||
|
if err != nil { |
||||
|
writeIamErrorResponse(w, fmt.Errorf(iam.ErrCodeServiceFailureException), "", "", err) |
||||
|
return |
||||
|
} |
||||
|
} |
||||
|
writeSuccessResponseXML(w, encodeResponse(response)) |
||||
|
} |
@ -0,0 +1,103 @@ |
|||||
|
package iamapi |
||||
|
|
||||
|
import ( |
||||
|
"encoding/xml" |
||||
|
"fmt" |
||||
|
"time" |
||||
|
|
||||
|
"github.com/aws/aws-sdk-go/service/iam" |
||||
|
) |
||||
|
|
||||
|
type CommonResponse struct { |
||||
|
ResponseMetadata struct { |
||||
|
RequestId string `xml:"RequestId"` |
||||
|
} `xml:"ResponseMetadata"` |
||||
|
} |
||||
|
|
||||
|
type ListUsersResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ ListUsersResponse"` |
||||
|
ListUsersResult struct { |
||||
|
Users []*iam.User `xml:"Users>member"` |
||||
|
IsTruncated bool `xml:"IsTruncated"` |
||||
|
} `xml:"ListUsersResult"` |
||||
|
} |
||||
|
|
||||
|
type ListAccessKeysResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ ListAccessKeysResponse"` |
||||
|
ListAccessKeysResult struct { |
||||
|
AccessKeyMetadata []*iam.AccessKeyMetadata `xml:"AccessKeyMetadata>member"` |
||||
|
IsTruncated bool `xml:"IsTruncated"` |
||||
|
} `xml:"ListAccessKeysResult"` |
||||
|
} |
||||
|
|
||||
|
type DeleteAccessKeyResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ DeleteAccessKeyResponse"` |
||||
|
} |
||||
|
|
||||
|
type CreatePolicyResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ CreatePolicyResponse"` |
||||
|
CreatePolicyResult struct { |
||||
|
Policy iam.Policy `xml:"Policy"` |
||||
|
} `xml:"CreatePolicyResult"` |
||||
|
} |
||||
|
|
||||
|
type CreateUserResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ CreateUserResponse"` |
||||
|
CreateUserResult struct { |
||||
|
User iam.User `xml:"User"` |
||||
|
} `xml:"CreateUserResult"` |
||||
|
} |
||||
|
|
||||
|
type DeleteUserResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ DeleteUserResponse"` |
||||
|
} |
||||
|
|
||||
|
type GetUserResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ GetUserResponse"` |
||||
|
GetUserResult struct { |
||||
|
User iam.User `xml:"User"` |
||||
|
} `xml:"GetUserResult"` |
||||
|
} |
||||
|
|
||||
|
type CreateAccessKeyResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ CreateAccessKeyResponse"` |
||||
|
CreateAccessKeyResult struct { |
||||
|
AccessKey iam.AccessKey `xml:"AccessKey"` |
||||
|
} `xml:"CreateAccessKeyResult"` |
||||
|
} |
||||
|
|
||||
|
type PutUserPolicyResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ PutUserPolicyResponse"` |
||||
|
} |
||||
|
|
||||
|
type GetUserPolicyResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ GetUserPolicyResponse"` |
||||
|
GetUserPolicyResult struct { |
||||
|
UserName string `xml:"UserName"` |
||||
|
PolicyName string `xml:"PolicyName"` |
||||
|
PolicyDocument string `xml:"PolicyDocument"` |
||||
|
} `xml:"GetUserPolicyResult"` |
||||
|
} |
||||
|
|
||||
|
type ErrorResponse struct { |
||||
|
CommonResponse |
||||
|
XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ ErrorResponse"` |
||||
|
Error struct { |
||||
|
iam.ErrorDetails |
||||
|
Type string `xml:"Type"` |
||||
|
} `xml:"Error"` |
||||
|
} |
||||
|
|
||||
|
func (r *CommonResponse) SetRequestId() { |
||||
|
r.ResponseMetadata.RequestId = fmt.Sprintf("%d", time.Now().UnixNano()) |
||||
|
} |
@ -0,0 +1,149 @@ |
|||||
|
package iamapi |
||||
|
|
||||
|
// https://docs.aws.amazon.com/cli/latest/reference/iam/list-roles.html
|
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"github.com/chrislusf/seaweedfs/weed/filer" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/s3api" |
||||
|
. "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" |
||||
|
"github.com/chrislusf/seaweedfs/weed/wdclient" |
||||
|
"github.com/gorilla/mux" |
||||
|
"google.golang.org/grpc" |
||||
|
"net/http" |
||||
|
"strings" |
||||
|
) |
||||
|
|
||||
|
type IamS3ApiConfig interface { |
||||
|
GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) |
||||
|
PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) |
||||
|
GetPolicies(policies *Policies) (err error) |
||||
|
PutPolicies(policies *Policies) (err error) |
||||
|
} |
||||
|
|
||||
|
type IamS3ApiConfigure struct { |
||||
|
option *IamServerOption |
||||
|
masterClient *wdclient.MasterClient |
||||
|
} |
||||
|
|
||||
|
type IamServerOption struct { |
||||
|
Masters string |
||||
|
Filer string |
||||
|
Port int |
||||
|
FilerGrpcAddress string |
||||
|
GrpcDialOption grpc.DialOption |
||||
|
} |
||||
|
|
||||
|
type IamApiServer struct { |
||||
|
s3ApiConfig IamS3ApiConfig |
||||
|
iam *s3api.IdentityAccessManagement |
||||
|
} |
||||
|
|
||||
|
var s3ApiConfigure IamS3ApiConfig |
||||
|
|
||||
|
func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer *IamApiServer, err error) { |
||||
|
s3ApiConfigure = IamS3ApiConfigure{ |
||||
|
option: option, |
||||
|
masterClient: wdclient.NewMasterClient(option.GrpcDialOption, pb.AdminShellClient, "", 0, "", strings.Split(option.Masters, ",")), |
||||
|
} |
||||
|
s3Option := s3api.S3ApiServerOption{Filer: option.Filer} |
||||
|
iamApiServer = &IamApiServer{ |
||||
|
s3ApiConfig: s3ApiConfigure, |
||||
|
iam: s3api.NewIdentityAccessManagement(&s3Option), |
||||
|
} |
||||
|
|
||||
|
iamApiServer.registerRouter(router) |
||||
|
|
||||
|
return iamApiServer, nil |
||||
|
} |
||||
|
|
||||
|
func (iama *IamApiServer) registerRouter(router *mux.Router) { |
||||
|
// API Router
|
||||
|
apiRouter := router.PathPrefix("/").Subrouter() |
||||
|
// ListBuckets
|
||||
|
|
||||
|
// apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_ADMIN), "LIST"))
|
||||
|
apiRouter.Methods("POST").Path("/").HandlerFunc(iama.iam.Auth(iama.DoActions, ACTION_ADMIN)) |
||||
|
//
|
||||
|
// NotFound
|
||||
|
apiRouter.NotFoundHandler = http.HandlerFunc(notFoundHandler) |
||||
|
} |
||||
|
|
||||
|
func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { |
||||
|
var buf bytes.Buffer |
||||
|
err = pb.WithGrpcFilerClient(iam.option.FilerGrpcAddress, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
||||
|
if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
return nil |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
if buf.Len() > 0 { |
||||
|
if err = filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { |
||||
|
buf := bytes.Buffer{} |
||||
|
if err := filer.S3ConfigurationToText(&buf, s3cfg); err != nil { |
||||
|
return fmt.Errorf("S3ConfigurationToText: %s", err) |
||||
|
} |
||||
|
return pb.WithGrpcFilerClient( |
||||
|
iam.option.FilerGrpcAddress, |
||||
|
iam.option.GrpcDialOption, |
||||
|
func(client filer_pb.SeaweedFilerClient) error { |
||||
|
if err := filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes()); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
return nil |
||||
|
}, |
||||
|
) |
||||
|
} |
||||
|
|
||||
|
func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { |
||||
|
var buf bytes.Buffer |
||||
|
err = pb.WithGrpcFilerClient(iam.option.FilerGrpcAddress, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
||||
|
if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirecotry, filer.IamPoliciesFile, &buf); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
return nil |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
if buf.Len() == 0 { |
||||
|
policies.Policies = make(map[string]PolicyDocument) |
||||
|
return nil |
||||
|
} |
||||
|
if err := json.Unmarshal(buf.Bytes(), policies); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (iam IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) { |
||||
|
var b []byte |
||||
|
if b, err = json.Marshal(policies); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
return pb.WithGrpcFilerClient( |
||||
|
iam.option.FilerGrpcAddress, |
||||
|
iam.option.GrpcDialOption, |
||||
|
func(client filer_pb.SeaweedFilerClient) error { |
||||
|
if err := filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamPoliciesFile, b); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
return nil |
||||
|
}, |
||||
|
) |
||||
|
} |
@ -0,0 +1,181 @@ |
|||||
|
package iamapi |
||||
|
|
||||
|
import ( |
||||
|
"encoding/xml" |
||||
|
"github.com/aws/aws-sdk-go/aws" |
||||
|
"github.com/aws/aws-sdk-go/aws/session" |
||||
|
"github.com/aws/aws-sdk-go/service/iam" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb" |
||||
|
"github.com/gorilla/mux" |
||||
|
"github.com/jinzhu/copier" |
||||
|
"github.com/stretchr/testify/assert" |
||||
|
"net/http" |
||||
|
"net/http/httptest" |
||||
|
"testing" |
||||
|
) |
||||
|
|
||||
|
var GetS3ApiConfiguration func(s3cfg *iam_pb.S3ApiConfiguration) (err error) |
||||
|
var PutS3ApiConfiguration func(s3cfg *iam_pb.S3ApiConfiguration) (err error) |
||||
|
var GetPolicies func(policies *Policies) (err error) |
||||
|
var PutPolicies func(policies *Policies) (err error) |
||||
|
|
||||
|
var s3config = iam_pb.S3ApiConfiguration{} |
||||
|
var policiesFile = Policies{Policies: make(map[string]PolicyDocument)} |
||||
|
var ias = IamApiServer{s3ApiConfig: iamS3ApiConfigureMock{}} |
||||
|
|
||||
|
type iamS3ApiConfigureMock struct{} |
||||
|
|
||||
|
func (iam iamS3ApiConfigureMock) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { |
||||
|
_ = copier.Copy(&s3cfg.Identities, &s3config.Identities) |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (iam iamS3ApiConfigureMock) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { |
||||
|
_ = copier.Copy(&s3config.Identities, &s3cfg.Identities) |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (iam iamS3ApiConfigureMock) GetPolicies(policies *Policies) (err error) { |
||||
|
_ = copier.Copy(&policies, &policiesFile) |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (iam iamS3ApiConfigureMock) PutPolicies(policies *Policies) (err error) { |
||||
|
_ = copier.Copy(&policiesFile, &policies) |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func TestCreateUser(t *testing.T) { |
||||
|
userName := aws.String("Test") |
||||
|
params := &iam.CreateUserInput{UserName: userName} |
||||
|
req, _ := iam.New(session.New()).CreateUserRequest(params) |
||||
|
_ = req.Build() |
||||
|
out := CreateUserResponse{} |
||||
|
response, err := executeRequest(req.HTTPRequest, out) |
||||
|
assert.Equal(t, nil, err) |
||||
|
assert.Equal(t, http.StatusOK, response.Code) |
||||
|
//assert.Equal(t, out.XMLName, "lol")
|
||||
|
} |
||||
|
|
||||
|
func TestListUsers(t *testing.T) { |
||||
|
params := &iam.ListUsersInput{} |
||||
|
req, _ := iam.New(session.New()).ListUsersRequest(params) |
||||
|
_ = req.Build() |
||||
|
out := ListUsersResponse{} |
||||
|
response, err := executeRequest(req.HTTPRequest, out) |
||||
|
assert.Equal(t, nil, err) |
||||
|
assert.Equal(t, http.StatusOK, response.Code) |
||||
|
} |
||||
|
|
||||
|
func TestListAccessKeys(t *testing.T) { |
||||
|
svc := iam.New(session.New()) |
||||
|
params := &iam.ListAccessKeysInput{} |
||||
|
req, _ := svc.ListAccessKeysRequest(params) |
||||
|
_ = req.Build() |
||||
|
out := ListAccessKeysResponse{} |
||||
|
response, err := executeRequest(req.HTTPRequest, out) |
||||
|
assert.Equal(t, nil, err) |
||||
|
assert.Equal(t, http.StatusOK, response.Code) |
||||
|
} |
||||
|
|
||||
|
func TestGetUser(t *testing.T) { |
||||
|
userName := aws.String("Test") |
||||
|
params := &iam.GetUserInput{UserName: userName} |
||||
|
req, _ := iam.New(session.New()).GetUserRequest(params) |
||||
|
_ = req.Build() |
||||
|
out := GetUserResponse{} |
||||
|
response, err := executeRequest(req.HTTPRequest, out) |
||||
|
assert.Equal(t, nil, err) |
||||
|
assert.Equal(t, http.StatusOK, response.Code) |
||||
|
} |
||||
|
|
||||
|
// Todo flat statement
|
||||
|
func TestCreatePolicy(t *testing.T) { |
||||
|
params := &iam.CreatePolicyInput{ |
||||
|
PolicyName: aws.String("S3-read-only-example-bucket"), |
||||
|
PolicyDocument: aws.String(` |
||||
|
{ |
||||
|
"Version": "2012-10-17", |
||||
|
"Statement": [ |
||||
|
{ |
||||
|
"Effect": "Allow", |
||||
|
"Action": [ |
||||
|
"s3:Get*", |
||||
|
"s3:List*" |
||||
|
], |
||||
|
"Resource": [ |
||||
|
"arn:aws:s3:::EXAMPLE-BUCKET", |
||||
|
"arn:aws:s3:::EXAMPLE-BUCKET/*" |
||||
|
] |
||||
|
} |
||||
|
] |
||||
|
}`), |
||||
|
} |
||||
|
req, _ := iam.New(session.New()).CreatePolicyRequest(params) |
||||
|
_ = req.Build() |
||||
|
out := CreatePolicyResponse{} |
||||
|
response, err := executeRequest(req.HTTPRequest, out) |
||||
|
assert.Equal(t, nil, err) |
||||
|
assert.Equal(t, http.StatusOK, response.Code) |
||||
|
} |
||||
|
|
||||
|
func TestPutUserPolicy(t *testing.T) { |
||||
|
userName := aws.String("Test") |
||||
|
params := &iam.PutUserPolicyInput{ |
||||
|
UserName: userName, |
||||
|
PolicyName: aws.String("S3-read-only-example-bucket"), |
||||
|
PolicyDocument: aws.String( |
||||
|
`{ |
||||
|
"Version": "2012-10-17", |
||||
|
"Statement": [ |
||||
|
{ |
||||
|
"Effect": "Allow", |
||||
|
"Action": [ |
||||
|
"s3:Get*", |
||||
|
"s3:List*" |
||||
|
], |
||||
|
"Resource": [ |
||||
|
"arn:aws:s3:::EXAMPLE-BUCKET", |
||||
|
"arn:aws:s3:::EXAMPLE-BUCKET/*" |
||||
|
] |
||||
|
} |
||||
|
] |
||||
|
}`), |
||||
|
} |
||||
|
req, _ := iam.New(session.New()).PutUserPolicyRequest(params) |
||||
|
_ = req.Build() |
||||
|
out := PutUserPolicyResponse{} |
||||
|
response, err := executeRequest(req.HTTPRequest, out) |
||||
|
assert.Equal(t, nil, err) |
||||
|
assert.Equal(t, http.StatusOK, response.Code) |
||||
|
} |
||||
|
|
||||
|
func TestGetUserPolicy(t *testing.T) { |
||||
|
userName := aws.String("Test") |
||||
|
params := &iam.GetUserPolicyInput{UserName: userName, PolicyName: aws.String("S3-read-only-example-bucket")} |
||||
|
req, _ := iam.New(session.New()).GetUserPolicyRequest(params) |
||||
|
_ = req.Build() |
||||
|
out := GetUserPolicyResponse{} |
||||
|
response, err := executeRequest(req.HTTPRequest, out) |
||||
|
assert.Equal(t, nil, err) |
||||
|
assert.Equal(t, http.StatusOK, response.Code) |
||||
|
} |
||||
|
|
||||
|
func TestDeleteUser(t *testing.T) { |
||||
|
userName := aws.String("Test") |
||||
|
params := &iam.DeleteUserInput{UserName: userName} |
||||
|
req, _ := iam.New(session.New()).DeleteUserRequest(params) |
||||
|
_ = req.Build() |
||||
|
out := DeleteUserResponse{} |
||||
|
response, err := executeRequest(req.HTTPRequest, out) |
||||
|
assert.Equal(t, nil, err) |
||||
|
assert.Equal(t, http.StatusOK, response.Code) |
||||
|
} |
||||
|
|
||||
|
func executeRequest(req *http.Request, v interface{}) (*httptest.ResponseRecorder, error) { |
||||
|
rr := httptest.NewRecorder() |
||||
|
apiRouter := mux.NewRouter().SkipClean(true) |
||||
|
apiRouter.Path("/").Methods("POST").HandlerFunc(ias.DoActions) |
||||
|
apiRouter.ServeHTTP(rr, req) |
||||
|
return rr, xml.Unmarshal(rr.Body.Bytes(), &v) |
||||
|
} |
@ -0,0 +1,106 @@ |
|||||
|
package weed_server |
||||
|
|
||||
|
import ( |
||||
|
"github.com/chrislusf/seaweedfs/weed/operation" |
||||
|
"google.golang.org/grpc" |
||||
|
"math/rand" |
||||
|
"net/http" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
|
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/redis" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2" |
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka" |
||||
|
_ "github.com/chrislusf/seaweedfs/weed/notification/log" |
||||
|
"github.com/chrislusf/seaweedfs/weed/security" |
||||
|
) |
||||
|
|
||||
|
type GatewayOption struct { |
||||
|
Masters []string |
||||
|
Filers []string |
||||
|
MaxMB int |
||||
|
IsSecure bool |
||||
|
} |
||||
|
|
||||
|
type GatewayServer struct { |
||||
|
option *GatewayOption |
||||
|
secret security.SigningKey |
||||
|
grpcDialOption grpc.DialOption |
||||
|
} |
||||
|
|
||||
|
func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) { |
||||
|
|
||||
|
fs = &GatewayServer{ |
||||
|
option: option, |
||||
|
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), |
||||
|
} |
||||
|
|
||||
|
if len(option.Masters) == 0 { |
||||
|
glog.Fatal("master list is required!") |
||||
|
} |
||||
|
|
||||
|
defaultMux.HandleFunc("/blobs/", fs.blobsHandler) |
||||
|
defaultMux.HandleFunc("/files/", fs.filesHandler) |
||||
|
defaultMux.HandleFunc("/topics/", fs.topicsHandler) |
||||
|
|
||||
|
return fs, nil |
||||
|
} |
||||
|
|
||||
|
func (fs *GatewayServer) getMaster() string { |
||||
|
randMaster := rand.Intn(len(fs.option.Masters)) |
||||
|
return fs.option.Masters[randMaster] |
||||
|
} |
||||
|
|
||||
|
func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) { |
||||
|
switch r.Method { |
||||
|
case "DELETE": |
||||
|
chunkId := r.URL.Path[len("/blobs/"):] |
||||
|
fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId) |
||||
|
if err != nil { |
||||
|
writeJsonError(w, r, http.StatusNotFound, err) |
||||
|
return |
||||
|
} |
||||
|
var jwtAuthorization security.EncodedJwt |
||||
|
if fs.option.IsSecure { |
||||
|
jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId) |
||||
|
} |
||||
|
body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization)) |
||||
|
if err != nil { |
||||
|
writeJsonError(w, r, http.StatusNotFound, err) |
||||
|
return |
||||
|
} |
||||
|
w.WriteHeader(statusCode) |
||||
|
w.Write(body) |
||||
|
case "POST": |
||||
|
submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) { |
||||
|
switch r.Method { |
||||
|
case "DELETE": |
||||
|
case "POST": |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) { |
||||
|
switch r.Method { |
||||
|
case "POST": |
||||
|
} |
||||
|
} |
Some files were not shown because too many files changed in this diff
Write
Preview
Loading…
Cancel
Save
Reference in new issue