Browse Source

Merge branch 'upstreamMaster' into iam_handlers

pull/1975/head
Konstantin Lebedev 4 years ago
parent
commit
8e02e138ea
  1. 2
      go.mod
  2. 2
      go.sum
  3. 4
      k8s/seaweedfs/Chart.yaml
  4. 2
      k8s/seaweedfs/values.yaml
  5. 1
      weed/command/command.go
  6. 93
      weed/command/gateway.go
  7. 1
      weed/filesys/dir.go
  8. 2
      weed/filesys/filehandle.go
  9. 2
      weed/operation/upload_content.go
  10. 170
      weed/server/filer_server_handlers_write_upload.go
  11. 106
      weed/server/gateway_server.go
  12. 2
      weed/util/constants.go
  13. 21
      weed/util/http_util.go

2
go.mod

@ -15,7 +15,7 @@ require (
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/bwmarrin/snowflake v0.3.0
github.com/cespare/xxhash v1.1.0
github.com/chrislusf/raft v1.0.5
github.com/chrislusf/raft v1.0.6
github.com/coreos/go-semver v0.3.0 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/disintegration/imaging v1.6.2

2
go.sum

@ -157,6 +157,8 @@ github.com/chrislusf/raft v1.0.4 h1:THhbsVik2hxdE0/VXX834f64Wn9RzgVPp+E+XCWZdKM=
github.com/chrislusf/raft v1.0.4/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chrislusf/raft v1.0.5 h1:g8GxKCSStfm0/bGBDpNEbmEXL6MJkpXX+NI0ksbX5D4=
github.com/chrislusf/raft v1.0.5/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chrislusf/raft v1.0.6 h1:wunb85WWhMKhNRn7EmdIw35D4Lmew0ZJv8oYDizR/+Y=
github.com/chrislusf/raft v1.0.6/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=

4
k8s/seaweedfs/Chart.yaml

@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
appVersion: "2.38"
version: 2.38
appVersion: "2.39"
version: 2.39

2
k8s/seaweedfs/values.yaml

@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
# imageTag: "2.38" - started using {.Chart.appVersion}
# imageTag: "2.39" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always

1
weed/command/command.go

@ -22,6 +22,7 @@ var Commands = []*Command{
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
cmdGateway,
cmdMaster,
cmdMount,
cmdS3,

93
weed/command/gateway.go

@ -0,0 +1,93 @@
package command
import (
"net/http"
"strconv"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
gatewayOptions GatewayOptions
)
type GatewayOptions struct {
masters *string
filers *string
bindIp *string
port *int
maxMB *int
}
func init() {
cmdGateway.Run = runGateway // break init cycle
gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers")
gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers")
gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to")
gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port")
gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit")
}
var cmdGateway = &Command{
UsageLine: "gateway -port=8888 -master=<ip:port>[,<ip:port>]* -filer=<ip:port>[,<ip:port>]*",
Short: "start a gateway server that points to a list of master servers or a list of filers",
Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages.
POST /blobs/
upload the blob and return a chunk id
DELETE /blobs/<chunk_id>
delete a chunk id
/*
POST /files/path/to/a/file
save /path/to/a/file on filer
DELETE /files/path/to/a/file
delete /path/to/a/file on filer
POST /topics/topicName
save on filer to /topics/topicName/<ds>/ts.json
*/
`,
}
func runGateway(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
gatewayOptions.startGateway()
return true
}
func (gw *GatewayOptions) startGateway() {
defaultMux := http.NewServeMux()
_, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{
Masters: strings.Split(*gw.masters, ","),
Filers: strings.Split(*gw.filers, ","),
MaxMB: *gw.maxMB,
})
if gws_err != nil {
glog.Fatalf("Gateway startup error: %v", gws_err)
}
glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port)
gatewayListener, e := util.NewListener(
*gw.bindIp+":"+strconv.Itoa(*gw.port),
time.Duration(10)*time.Second,
)
if e != nil {
glog.Fatalf("Filer listener error: %v", e)
}
httpS := &http.Server{Handler: defaultMux}
if err := httpS.Serve(gatewayListener); err != nil {
glog.Fatalf("Gateway Fail to serve: %v", e)
}
}

1
weed/filesys/dir.go

@ -372,7 +372,6 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
return fuse.EPERM
}
if !req.Dir {
return dir.removeOneFile(req)
}

2
weed/filesys/filehandle.go

@ -200,6 +200,8 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
fh.Lock()
defer fh.Unlock()
fh.f.entryViewCache = nil
if fh.f.isOpen <= 0 {
glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
fh.f.isOpen = 0

2
weed/operation/upload_content.go

@ -235,7 +235,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
// print("+")
resp, post_err := HttpClient.Do(req)
if post_err != nil {
if !strings.Contains(post_err.Error(), "connection reset by peer"){
if !strings.Contains(post_err.Error(), "connection reset by peer") {
glog.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
debug.PrintStack()
}

170
weed/server/filer_server_handlers_write_upload.go

@ -6,9 +6,7 @@ import (
"io"
"io/ioutil"
"net/http"
"runtime"
"strings"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
@ -20,143 +18,75 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
)
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
var fileChunks []*filer_pb.FileChunk
md5Hash = md5.New()
md5Hash := md5.New()
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
// save small content directly
if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) {
smallContent, err = ioutil.ReadAll(partReader)
dataSize = int64(len(smallContent))
return
}
chunkOffset := int64(0)
var smallContent []byte
resultsChan := make(chan *ChunkCreationResult, 2)
for {
limitedReader := io.LimitReader(partReader, int64(chunkSize))
var waitForAllData sync.WaitGroup
waitForAllData.Add(1)
go func() {
// process upload results
defer waitForAllData.Done()
for result := range resultsChan {
if result.err != nil {
err = result.err
continue
}
// Save to chunk manifest structure
fileChunks = append(fileChunks, result.chunk)
data, err := ioutil.ReadAll(limitedReader)
if err != nil {
return nil, nil, 0, err, nil
}
}()
var lock sync.Mutex
readOffset := int64(0)
var wg sync.WaitGroup
for err == nil {
wg.Add(1)
request := func() {
defer wg.Done()
var localOffset int64
// read from the input
lock.Lock()
localOffset = readOffset
limitedReader := io.LimitReader(partReader, int64(chunkSize))
data, readErr := ioutil.ReadAll(limitedReader)
readOffset += int64(len(data))
lock.Unlock()
// handle read errors
if readErr != nil {
if err == nil {
err = readErr
}
if readErr != io.EOF {
resultsChan <- &ChunkCreationResult{
err: readErr,
}
}
return
if chunkOffset == 0 && !isAppend(r) {
if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
smallContent = data
chunkOffset += int64(len(data))
break
}
if len(data) == 0 {
readErr = io.EOF
if err == nil {
err = readErr
}
return
}
dataReader := util.NewBytesReader(data)
// retry to assign a different file id
var fileId, urlLocation string
var auth security.EncodedJwt
var assignErr, uploadErr error
var uploadResult *operation.UploadResult
for i := 0; i < 3; i++ {
// assign one file id for one chunk
fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
if assignErr != nil {
return nil, nil, 0, assignErr, nil
}
// upload
dataReader := util.NewBytesReader(data)
fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType)
// upload the chunk to the volume server
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
if err == nil {
err = uploadErr
}
resultsChan <- &ChunkCreationResult{
err: uploadErr,
}
return
}
glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size))
// send back uploaded file chunk
resultsChan <- &ChunkCreationResult{
chunk: uploadResult.ToPbFileChunk(fileId, localOffset),
time.Sleep(251 * time.Millisecond)
continue
}
break
}
if uploadErr != nil {
return nil, nil, 0, uploadErr, nil
}
limitedUploadProcessor.Execute(request)
}
go func() {
wg.Wait()
close(resultsChan)
}()
waitForAllData.Wait()
if err == io.EOF {
err = nil
}
// if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 {
break
}
return fileChunks, md5Hash, readOffset, err, nil
}
// Save to chunk manifest structure
fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
type ChunkCreationResult struct {
chunk *filer_pb.FileChunk
err error
}
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) {
// retry to assign a different file id
var fileId, urlLocation string
var auth security.EncodedJwt
var assignErr, uploadErr error
var uploadResult *operation.UploadResult
for i := 0; i < 3; i++ {
// assign one file id for one chunk
fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
if assignErr != nil {
return "", nil, assignErr
}
// reset variables for the next chunk
chunkOffset = chunkOffset + int64(uploadResult.Size)
// upload the chunk to the volume server
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
time.Sleep(251 * time.Millisecond)
continue
// if last chunk was not at full chunk size, but already exhausted the reader
if int64(uploadResult.Size) < int64(chunkSize) {
break
}
break
}
return fileId, uploadResult, uploadErr
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {

106
weed/server/gateway_server.go

@ -0,0 +1,106 @@
package weed_server
import (
"github.com/chrislusf/seaweedfs/weed/operation"
"google.golang.org/grpc"
"math/rand"
"net/http"
"github.com/chrislusf/seaweedfs/weed/util"
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
"github.com/chrislusf/seaweedfs/weed/glog"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
_ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
"github.com/chrislusf/seaweedfs/weed/security"
)
type GatewayOption struct {
Masters []string
Filers []string
MaxMB int
IsSecure bool
}
type GatewayServer struct {
option *GatewayOption
secret security.SigningKey
grpcDialOption grpc.DialOption
}
func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) {
fs = &GatewayServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
}
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
defaultMux.HandleFunc("/blobs/", fs.blobsHandler)
defaultMux.HandleFunc("/files/", fs.filesHandler)
defaultMux.HandleFunc("/topics/", fs.topicsHandler)
return fs, nil
}
func (fs *GatewayServer) getMaster() string {
randMaster := rand.Intn(len(fs.option.Masters))
return fs.option.Masters[randMaster]
}
func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "DELETE":
chunkId := r.URL.Path[len("/blobs/"):]
fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId)
if err != nil {
writeJsonError(w, r, http.StatusNotFound, err)
return
}
var jwtAuthorization security.EncodedJwt
if fs.option.IsSecure {
jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId)
}
body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization))
if err != nil {
writeJsonError(w, r, http.StatusNotFound, err)
return
}
w.WriteHeader(statusCode)
w.Write(body)
case "POST":
submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption)
}
}
func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "DELETE":
case "POST":
}
}
func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "POST":
}
}

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
)
var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 38)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 39)
COMMIT = ""
)

21
weed/util/http_util.go

@ -124,6 +124,27 @@ func Delete(url string, jwt string) error {
return errors.New(string(body))
}
func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
req, err := http.NewRequest("DELETE", url, nil)
if jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(jwt))
}
if err != nil {
return
}
resp, err := client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
return
}
httpStatus = resp.StatusCode
return
}
func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
r, err := client.PostForm(url, values)
if err != nil {

Loading…
Cancel
Save