From af313dff58bf82a731dbce72535b72f1979d6740 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 10 Apr 2021 23:47:47 -0700 Subject: [PATCH] add gateway for easier POST and DELETE blobs --- weed/command/command.go | 1 + weed/command/gateway.go | 93 +++++++++++++++++++++++++++++ weed/server/gateway_server.go | 106 ++++++++++++++++++++++++++++++++++ weed/util/http_util.go | 22 +++++++ 4 files changed, 222 insertions(+) create mode 100644 weed/command/gateway.go create mode 100644 weed/server/gateway_server.go diff --git a/weed/command/command.go b/weed/command/command.go index ce754702f..b6efcead2 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -22,6 +22,7 @@ var Commands = []*Command{ cmdFilerReplicate, cmdFilerSynchronize, cmdFix, + cmdGateway, cmdMaster, cmdMount, cmdS3, diff --git a/weed/command/gateway.go b/weed/command/gateway.go new file mode 100644 index 000000000..a2a97889f --- /dev/null +++ b/weed/command/gateway.go @@ -0,0 +1,93 @@ +package command + +import ( + "net/http" + "strconv" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + gatewayOptions GatewayOptions +) + +type GatewayOptions struct { + masters *string + filers *string + bindIp *string + port *int + maxMB *int +} + +func init() { + cmdGateway.Run = runGateway // break init cycle + gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers") + gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers") + gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to") + gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port") + gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit") +} + +var cmdGateway = &Command{ + UsageLine: "gateway -port=8888 -master=[,]* -filer=[,]*", + Short: "start a gateway server that points to a list of master servers or a list of filers", + Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages. + + POST /blobs/ + return a chunk id + DELETE /blobs/ + delete a chunk id + + /* + POST /files/path/to/a/file + save /path/to/a/file on filer + DELETE /files/path/to/a/file + delete /path/to/a/file on filer + + POST /topics/topicName + save on filer to /topics/topicName//ts.json + */ +`, +} + +func runGateway(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + gatewayOptions.startGateway() + + return true +} + +func (gw *GatewayOptions) startGateway() { + + defaultMux := http.NewServeMux() + + _, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{ + Masters: strings.Split(*gw.masters, ","), + Filers: strings.Split(*gw.filers, ","), + MaxMB: *gw.maxMB, + }) + if gws_err != nil { + glog.Fatalf("Gateway startup error: %v", gws_err) + } + + glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port) + gatewayListener, e := util.NewListener( + *gw.bindIp+":"+strconv.Itoa(*gw.port), + time.Duration(10)*time.Second, + ) + if e != nil { + glog.Fatalf("Filer listener error: %v", e) + } + + httpS := &http.Server{Handler: defaultMux} + if err := httpS.Serve(gatewayListener); err != nil { + glog.Fatalf("Gateway Fail to serve: %v", e) + } + +} diff --git a/weed/server/gateway_server.go b/weed/server/gateway_server.go new file mode 100644 index 000000000..608217ed7 --- /dev/null +++ b/weed/server/gateway_server.go @@ -0,0 +1,106 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/operation" + "google.golang.org/grpc" + "math/rand" + "net/http" + + "github.com/chrislusf/seaweedfs/weed/util" + + _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" + _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" + _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + "github.com/chrislusf/seaweedfs/weed/glog" + _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" + _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" + _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" + _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" + _ "github.com/chrislusf/seaweedfs/weed/notification/log" + "github.com/chrislusf/seaweedfs/weed/security" +) + +type GatewayOption struct { + Masters []string + Filers []string + MaxMB int + IsSecure bool +} + +type GatewayServer struct { + option *GatewayOption + secret security.SigningKey + grpcDialOption grpc.DialOption +} + +func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) { + + fs = &GatewayServer{ + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), + } + + if len(option.Masters) == 0 { + glog.Fatal("master list is required!") + } + + defaultMux.HandleFunc("/blobs/", fs.blobsHandler) + defaultMux.HandleFunc("/files/", fs.filesHandler) + defaultMux.HandleFunc("/topics/", fs.topicsHandler) + + return fs, nil +} + +func (fs *GatewayServer) getMaster() string { + randMaster := rand.Intn(len(fs.option.Masters)) + return fs.option.Masters[randMaster] +} + +func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "DELETE": + chunkId := r.URL.Path[len("/blobs/"):] + fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + var jwtAuthorization security.EncodedJwt + if fs.option.IsSecure { + jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId) + } + body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization)) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + w.WriteHeader(statusCode) + w.Write(body) + case "POST": + submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption) + } +} + +func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "DELETE": + case "POST": + } +} + +func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "POST": + } +} diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 135d10c45..1c1b2b377 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -124,6 +124,28 @@ func Delete(url string, jwt string) error { return errors.New(string(body)) } +func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) { + req, err := http.NewRequest("DELETE", url, nil) + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + if err != nil { + return + } + resp, err := client.Do(req) + if err != nil { + return + } + defer resp.Body.Close() + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + return + } + httpStatus = resp.StatusCode + return +} + + func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error { r, err := client.PostForm(url, values) if err != nil {