3 changed files with 0 additions and 200 deletions
@ -1,93 +0,0 @@ |
|||||
package command |
|
||||
|
|
||||
import ( |
|
||||
"net/http" |
|
||||
"strconv" |
|
||||
"strings" |
|
||||
"time" |
|
||||
|
|
||||
"github.com/chrislusf/seaweedfs/weed/glog" |
|
||||
"github.com/chrislusf/seaweedfs/weed/server" |
|
||||
"github.com/chrislusf/seaweedfs/weed/util" |
|
||||
) |
|
||||
|
|
||||
var ( |
|
||||
gatewayOptions GatewayOptions |
|
||||
) |
|
||||
|
|
||||
type GatewayOptions struct { |
|
||||
masters *string |
|
||||
filers *string |
|
||||
bindIp *string |
|
||||
port *int |
|
||||
maxMB *int |
|
||||
} |
|
||||
|
|
||||
func init() { |
|
||||
cmdGateway.Run = runGateway // break init cycle
|
|
||||
gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers") |
|
||||
gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers") |
|
||||
gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to") |
|
||||
gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port") |
|
||||
gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit") |
|
||||
} |
|
||||
|
|
||||
var cmdGateway = &Command{ |
|
||||
UsageLine: "gateway -port=8888 -master=<ip:port>[,<ip:port>]* -filer=<ip:port>[,<ip:port>]*", |
|
||||
Short: "start a gateway server that points to a list of master servers or a list of filers", |
|
||||
Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages. |
|
||||
|
|
||||
POST /blobs/ |
|
||||
upload the blob and return a chunk id |
|
||||
DELETE /blobs/<chunk_id> |
|
||||
delete a chunk id |
|
||||
|
|
||||
/* |
|
||||
POST /files/path/to/a/file |
|
||||
save /path/to/a/file on filer |
|
||||
DELETE /files/path/to/a/file |
|
||||
delete /path/to/a/file on filer |
|
||||
|
|
||||
POST /topics/topicName |
|
||||
save on filer to /topics/topicName/<ds>/ts.json |
|
||||
*/ |
|
||||
`, |
|
||||
} |
|
||||
|
|
||||
func runGateway(cmd *Command, args []string) bool { |
|
||||
|
|
||||
util.LoadConfiguration("security", false) |
|
||||
|
|
||||
gatewayOptions.startGateway() |
|
||||
|
|
||||
return true |
|
||||
} |
|
||||
|
|
||||
func (gw *GatewayOptions) startGateway() { |
|
||||
|
|
||||
defaultMux := http.NewServeMux() |
|
||||
|
|
||||
_, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{ |
|
||||
Masters: strings.Split(*gw.masters, ","), |
|
||||
Filers: strings.Split(*gw.filers, ","), |
|
||||
MaxMB: *gw.maxMB, |
|
||||
}) |
|
||||
if gws_err != nil { |
|
||||
glog.Fatalf("Gateway startup error: %v", gws_err) |
|
||||
} |
|
||||
|
|
||||
glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port) |
|
||||
gatewayListener, e := util.NewListener( |
|
||||
*gw.bindIp+":"+strconv.Itoa(*gw.port), |
|
||||
time.Duration(10)*time.Second, |
|
||||
) |
|
||||
if e != nil { |
|
||||
glog.Fatalf("Filer listener error: %v", e) |
|
||||
} |
|
||||
|
|
||||
httpS := &http.Server{Handler: defaultMux} |
|
||||
if err := httpS.Serve(gatewayListener); err != nil { |
|
||||
glog.Fatalf("Gateway Fail to serve: %v", e) |
|
||||
} |
|
||||
|
|
||||
} |
|
@ -1,106 +0,0 @@ |
|||||
package weed_server |
|
||||
|
|
||||
import ( |
|
||||
"github.com/chrislusf/seaweedfs/weed/operation" |
|
||||
"google.golang.org/grpc" |
|
||||
"math/rand" |
|
||||
"net/http" |
|
||||
|
|
||||
"github.com/chrislusf/seaweedfs/weed/util" |
|
||||
|
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/redis" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2" |
|
||||
"github.com/chrislusf/seaweedfs/weed/glog" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka" |
|
||||
_ "github.com/chrislusf/seaweedfs/weed/notification/log" |
|
||||
"github.com/chrislusf/seaweedfs/weed/security" |
|
||||
) |
|
||||
|
|
||||
type GatewayOption struct { |
|
||||
Masters []string |
|
||||
Filers []string |
|
||||
MaxMB int |
|
||||
IsSecure bool |
|
||||
} |
|
||||
|
|
||||
type GatewayServer struct { |
|
||||
option *GatewayOption |
|
||||
secret security.SigningKey |
|
||||
grpcDialOption grpc.DialOption |
|
||||
} |
|
||||
|
|
||||
func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) { |
|
||||
|
|
||||
fs = &GatewayServer{ |
|
||||
option: option, |
|
||||
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), |
|
||||
} |
|
||||
|
|
||||
if len(option.Masters) == 0 { |
|
||||
glog.Fatal("master list is required!") |
|
||||
} |
|
||||
|
|
||||
defaultMux.HandleFunc("/blobs/", fs.blobsHandler) |
|
||||
defaultMux.HandleFunc("/files/", fs.filesHandler) |
|
||||
defaultMux.HandleFunc("/topics/", fs.topicsHandler) |
|
||||
|
|
||||
return fs, nil |
|
||||
} |
|
||||
|
|
||||
func (fs *GatewayServer) getMaster() string { |
|
||||
randMaster := rand.Intn(len(fs.option.Masters)) |
|
||||
return fs.option.Masters[randMaster] |
|
||||
} |
|
||||
|
|
||||
func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) { |
|
||||
switch r.Method { |
|
||||
case "DELETE": |
|
||||
chunkId := r.URL.Path[len("/blobs/"):] |
|
||||
fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId) |
|
||||
if err != nil { |
|
||||
writeJsonError(w, r, http.StatusNotFound, err) |
|
||||
return |
|
||||
} |
|
||||
var jwtAuthorization security.EncodedJwt |
|
||||
if fs.option.IsSecure { |
|
||||
jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId) |
|
||||
} |
|
||||
body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization)) |
|
||||
if err != nil { |
|
||||
writeJsonError(w, r, http.StatusNotFound, err) |
|
||||
return |
|
||||
} |
|
||||
w.WriteHeader(statusCode) |
|
||||
w.Write(body) |
|
||||
case "POST": |
|
||||
submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption) |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) { |
|
||||
switch r.Method { |
|
||||
case "DELETE": |
|
||||
case "POST": |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) { |
|
||||
switch r.Method { |
|
||||
case "POST": |
|
||||
} |
|
||||
} |
|
Write
Preview
Loading…
Cancel
Save
Reference in new issue