Chris Lu
4 years ago
4 changed files with 222 additions and 0 deletions
-
1weed/command/command.go
-
93weed/command/gateway.go
-
106weed/server/gateway_server.go
-
22weed/util/http_util.go
@ -0,0 +1,93 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"net/http" |
|||
"strconv" |
|||
"strings" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/server" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
var ( |
|||
gatewayOptions GatewayOptions |
|||
) |
|||
|
|||
type GatewayOptions struct { |
|||
masters *string |
|||
filers *string |
|||
bindIp *string |
|||
port *int |
|||
maxMB *int |
|||
} |
|||
|
|||
func init() { |
|||
cmdGateway.Run = runGateway // break init cycle
|
|||
gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers") |
|||
gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers") |
|||
gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to") |
|||
gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port") |
|||
gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit") |
|||
} |
|||
|
|||
var cmdGateway = &Command{ |
|||
UsageLine: "gateway -port=8888 -master=<ip:port>[,<ip:port>]* -filer=<ip:port>[,<ip:port>]*", |
|||
Short: "start a gateway server that points to a list of master servers or a list of filers", |
|||
Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages. |
|||
|
|||
POST /blobs/ |
|||
return a chunk id |
|||
DELETE /blobs/<chunk_id> |
|||
delete a chunk id |
|||
|
|||
/* |
|||
POST /files/path/to/a/file |
|||
save /path/to/a/file on filer |
|||
DELETE /files/path/to/a/file |
|||
delete /path/to/a/file on filer |
|||
|
|||
POST /topics/topicName |
|||
save on filer to /topics/topicName/<ds>/ts.json |
|||
*/ |
|||
`, |
|||
} |
|||
|
|||
func runGateway(cmd *Command, args []string) bool { |
|||
|
|||
util.LoadConfiguration("security", false) |
|||
|
|||
gatewayOptions.startGateway() |
|||
|
|||
return true |
|||
} |
|||
|
|||
func (gw *GatewayOptions) startGateway() { |
|||
|
|||
defaultMux := http.NewServeMux() |
|||
|
|||
_, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{ |
|||
Masters: strings.Split(*gw.masters, ","), |
|||
Filers: strings.Split(*gw.filers, ","), |
|||
MaxMB: *gw.maxMB, |
|||
}) |
|||
if gws_err != nil { |
|||
glog.Fatalf("Gateway startup error: %v", gws_err) |
|||
} |
|||
|
|||
glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port) |
|||
gatewayListener, e := util.NewListener( |
|||
*gw.bindIp+":"+strconv.Itoa(*gw.port), |
|||
time.Duration(10)*time.Second, |
|||
) |
|||
if e != nil { |
|||
glog.Fatalf("Filer listener error: %v", e) |
|||
} |
|||
|
|||
httpS := &http.Server{Handler: defaultMux} |
|||
if err := httpS.Serve(gatewayListener); err != nil { |
|||
glog.Fatalf("Gateway Fail to serve: %v", e) |
|||
} |
|||
|
|||
} |
@ -0,0 +1,106 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/operation" |
|||
"google.golang.org/grpc" |
|||
"math/rand" |
|||
"net/http" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
|
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/redis" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" |
|||
_ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" |
|||
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" |
|||
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka" |
|||
_ "github.com/chrislusf/seaweedfs/weed/notification/log" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
) |
|||
|
|||
type GatewayOption struct { |
|||
Masters []string |
|||
Filers []string |
|||
MaxMB int |
|||
IsSecure bool |
|||
} |
|||
|
|||
type GatewayServer struct { |
|||
option *GatewayOption |
|||
secret security.SigningKey |
|||
grpcDialOption grpc.DialOption |
|||
} |
|||
|
|||
func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) { |
|||
|
|||
fs = &GatewayServer{ |
|||
option: option, |
|||
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), |
|||
} |
|||
|
|||
if len(option.Masters) == 0 { |
|||
glog.Fatal("master list is required!") |
|||
} |
|||
|
|||
defaultMux.HandleFunc("/blobs/", fs.blobsHandler) |
|||
defaultMux.HandleFunc("/files/", fs.filesHandler) |
|||
defaultMux.HandleFunc("/topics/", fs.topicsHandler) |
|||
|
|||
return fs, nil |
|||
} |
|||
|
|||
func (fs *GatewayServer) getMaster() string { |
|||
randMaster := rand.Intn(len(fs.option.Masters)) |
|||
return fs.option.Masters[randMaster] |
|||
} |
|||
|
|||
func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) { |
|||
switch r.Method { |
|||
case "DELETE": |
|||
chunkId := r.URL.Path[len("/blobs/"):] |
|||
fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId) |
|||
if err != nil { |
|||
writeJsonError(w, r, http.StatusNotFound, err) |
|||
return |
|||
} |
|||
var jwtAuthorization security.EncodedJwt |
|||
if fs.option.IsSecure { |
|||
jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId) |
|||
} |
|||
body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization)) |
|||
if err != nil { |
|||
writeJsonError(w, r, http.StatusNotFound, err) |
|||
return |
|||
} |
|||
w.WriteHeader(statusCode) |
|||
w.Write(body) |
|||
case "POST": |
|||
submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption) |
|||
} |
|||
} |
|||
|
|||
func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) { |
|||
switch r.Method { |
|||
case "DELETE": |
|||
case "POST": |
|||
} |
|||
} |
|||
|
|||
func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) { |
|||
switch r.Method { |
|||
case "POST": |
|||
} |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue