Browse Source

Merge pull request #6 from chrislusf/master

merge
pull/84/head
yourchanges 10 years ago
parent
commit
2adf4a108b
  1. 2
      docs/replication.rst
  2. 1
      go/operation/data_struts.go
  3. 5
      go/operation/delete_content.go
  4. 32
      go/operation/submit.go
  5. 10
      go/operation/upload_content.go
  6. 111
      go/security/guard.go
  7. 72
      go/security/jwt.go
  8. 14
      go/storage/store.go
  9. 26
      go/topology/store_replicate.go
  10. 2
      go/util/constants.go
  11. 7
      go/util/http_util.go
  12. 10
      go/weed/benchmark.go
  13. 4
      go/weed/filer.go
  14. 11
      go/weed/master.go
  15. 9
      go/weed/mount_std.go
  16. 17
      go/weed/server.go
  17. 22
      go/weed/upload.go
  18. 4
      go/weed/weed_server/common.go
  19. 7
      go/weed/weed_server/filer_server.go
  20. 6
      go/weed/weed_server/filer_server_handlers.go
  21. 29
      go/weed/weed_server/master_server.go
  22. 5
      go/weed/weed_server/master_server_handlers_admin.go
  23. 48
      go/weed/weed_server/volume_server.go
  24. 7
      go/weed/weed_server/volume_server_handlers.go

2
docs/replication.rst

@ -10,7 +10,7 @@ Basically, the way it works is:
.. code-block:: bash .. code-block:: bash
./weed master -defaultReplicationType=001
./weed master -defaultReplication=001
2. start volume servers as this: 2. start volume servers as this:

1
go/operation/data_struts.go

@ -2,5 +2,6 @@ package operation
type JoinResult struct { type JoinResult struct {
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
SecretKey string `json:"secretKey,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }

5
go/operation/delete_content.go

@ -7,6 +7,7 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
) )
@ -16,12 +17,12 @@ type DeleteResult struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
func DeleteFile(master string, fileId string) error {
func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
fileUrl, err := LookupFileId(master, fileId) fileUrl, err := LookupFileId(master, fileId)
if err != nil { if err != nil {
return err return err
} }
return util.Delete(fileUrl)
return util.Delete(fileUrl, jwt)
} }
func ParseFileId(fid string) (vid string, key_cookie string, err error) { func ParseFileId(fid string) (vid string, key_cookie string, err error) {

32
go/operation/submit.go

@ -10,6 +10,7 @@ import (
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/security"
) )
type FilePart struct { type FilePart struct {
@ -34,7 +35,10 @@ type SubmitResult struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) {
func SubmitFiles(master string, files []FilePart,
replication string, collection string, ttl string, maxMB int,
secret security.Secret,
) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files)) results := make([]SubmitResult, len(files))
for index, file := range files { for index, file := range files {
results[index].FileName = file.FileName results[index].FileName = file.FileName
@ -54,7 +58,7 @@ func SubmitFiles(master string, files []FilePart, replication string, collection
file.Server = ret.PublicUrl file.Server = ret.PublicUrl
file.Replication = replication file.Replication = replication
file.Collection = collection file.Collection = collection
results[index].Size, err = file.Upload(maxMB, master)
results[index].Size, err = file.Upload(maxMB, master, secret)
if err != nil { if err != nil {
results[index].Error = err.Error() results[index].Error = err.Error()
} }
@ -101,7 +105,8 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil return ret, nil
} }
func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) {
func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
jwt := security.GenJwt(secret, fi.Fid)
fileUrl := "http://" + fi.Server + "/" + fi.Fid fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 { if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@ -114,16 +119,20 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
chunks := fi.FileSize/chunkSize + 1 chunks := fi.FileSize/chunkSize + 1
fids := make([]string, 0) fids := make([]string, 0)
for i := int64(0); i < chunks; i++ { for i := int64(0); i < chunks; i++ {
id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl)
id, count, e := upload_one_chunk(
fi.FileName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
master, fi.Replication, fi.Collection, fi.Ttl,
jwt)
if e != nil { if e != nil {
return 0, e return 0, e
} }
fids = append(fids, id) fids = append(fids, id)
retSize += count retSize += count
} }
err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids)
err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids, jwt)
} else { } else {
ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType)
ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
if e != nil { if e != nil {
return 0, e return 0, e
} }
@ -132,24 +141,27 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
return return
} }
func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) {
func upload_one_chunk(filename string, reader io.Reader, master,
replication string, collection string, ttl string, jwt security.EncodedJwt,
) (fid string, size uint32, e error) {
ret, err := Assign(master, 1, replication, collection, ttl) ret, err := Assign(master, 1, replication, collection, ttl)
if err != nil { if err != nil {
return "", 0, err return "", 0, err
} }
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream")
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
"application/octet-stream", jwt)
if uploadError != nil { if uploadError != nil {
return fid, 0, uploadError return fid, 0, uploadError
} }
return fid, uploadResult.Size, nil return fid, uploadResult.Size, nil
} }
func upload_file_id_list(fileUrl, filename string, fids []string) error {
func upload_file_id_list(fileUrl, filename string, fids []string, jwt security.EncodedJwt) error {
var buf bytes.Buffer var buf bytes.Buffer
buf.WriteString(strings.Join(fids, "\n")) buf.WriteString(strings.Join(fids, "\n"))
glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...") glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...")
_, e := Upload(fileUrl, filename, &buf, false, "text/plain")
_, e := Upload(fileUrl, filename, &buf, false, "text/plain", jwt)
return e return e
} }

10
go/operation/upload_content.go

@ -15,6 +15,7 @@ import (
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/security"
) )
type UploadResult struct { type UploadResult struct {
@ -35,13 +36,13 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string) (*UploadResult, error) {
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
return upload_content(uploadUrl, func(w io.Writer) (err error) { return upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = io.Copy(w, reader) _, err = io.Copy(w, reader)
return return
}, filename, isGzipped, mtype)
}, filename, isGzipped, mtype, jwt)
} }
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string) (*UploadResult, error) {
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
body_buf := bytes.NewBufferString("") body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf) body_writer := multipart.NewWriter(body_buf)
h := make(textproto.MIMEHeader) h := make(textproto.MIMEHeader)
@ -55,6 +56,9 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if isGzipped { if isGzipped {
h.Set("Content-Encoding", "gzip") h.Set("Content-Encoding", "gzip")
} }
if jwt != "" {
h.Set("Authorization", "BEARER "+string(jwt))
}
file_writer, cp_err := body_writer.CreatePart(h) file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil { if cp_err != nil {
glog.V(0).Infoln("error creating form file", cp_err.Error()) glog.V(0).Infoln("error creating form file", cp_err.Error())

111
go/security/guard.go

@ -5,11 +5,8 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"strings"
"time"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/dgrijalva/jwt-go"
) )
var ( var (
@ -44,24 +41,24 @@ https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go
*/ */
type Guard struct { type Guard struct {
whiteList []string whiteList []string
secretKey string
SecretKey Secret
isActive bool isActive bool
} }
func NewGuard(whiteList []string, secretKey string) *Guard { func NewGuard(whiteList []string, secretKey string) *Guard {
g := &Guard{whiteList: whiteList, secretKey: secretKey}
g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0
g := &Guard{whiteList: whiteList, SecretKey: Secret(secretKey)}
g.isActive = len(g.whiteList) != 0 || len(g.SecretKey) != 0
return g return g
} }
func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
if !g.isActive { if !g.isActive {
//if no security needed, just skip all checkings //if no security needed, just skip all checkings
return f return f
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if err := g.doCheck(w, r); err != nil {
if err := g.checkWhiteList(w, r); err != nil {
w.WriteHeader(http.StatusUnauthorized) w.WriteHeader(http.StatusUnauthorized)
return return
} }
@ -69,76 +66,62 @@ func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w ht
} }
} }
func (g *Guard) NewToken() (tokenString string, err error) {
m := make(map[string]interface{})
m["exp"] = time.Now().Unix() + 10
return g.Encode(m)
}
func (g *Guard) Encode(claims map[string]interface{}) (tokenString string, err error) {
func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
if !g.isActive { if !g.isActive {
return "", nil
//if no security needed, just skip all checkings
return f
}
return func(w http.ResponseWriter, r *http.Request) {
if err := g.checkJwt(w, r); err != nil {
w.WriteHeader(http.StatusUnauthorized)
return
}
f(w, r)
} }
t := jwt.New(jwt.GetSigningMethod("HS256"))
t.Claims = claims
return t.SignedString(g.secretKey)
} }
func (g *Guard) Decode(tokenString string) (token *jwt.Token, err error) {
return jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
return g.secretKey, nil
})
}
func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error {
if len(g.whiteList) == 0 {
return nil
}
func (g *Guard) doCheck(w http.ResponseWriter, r *http.Request) error {
if len(g.whiteList) != 0 {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err == nil {
for _, ip := range g.whiteList {
if ip == host {
return nil
}
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err == nil {
for _, ip := range g.whiteList {
if ip == host {
return nil
} }
} }
} }
if len(g.secretKey) != 0 {
// Get token from query params
tokenStr := r.URL.Query().Get("jwt")
glog.V(1).Infof("Not in whitelist: %s", r.RemoteAddr)
return fmt.Errorf("Not in whitelis: %s", r.RemoteAddr)
}
// Get token from authorization header
if tokenStr == "" {
bearer := r.Header.Get("Authorization")
if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
tokenStr = bearer[7:]
}
}
func (g *Guard) checkJwt(w http.ResponseWriter, r *http.Request) error {
if g.checkWhiteList(w, r) == nil {
return nil
}
// Get token from cookie
if tokenStr == "" {
cookie, err := r.Cookie("jwt")
if err == nil {
tokenStr = cookie.Value
}
}
if len(g.SecretKey) == 0 {
return nil
}
if tokenStr == "" {
return ErrUnauthorized
}
tokenStr := GetJwt(r)
// Verify the token
token, err := g.Decode(tokenStr)
if err != nil {
glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err)
return ErrUnauthorized
}
if !token.Valid {
glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr)
return ErrUnauthorized
}
if tokenStr == "" {
return ErrUnauthorized
}
// Verify the token
token, err := DecodeJwt(g.SecretKey, tokenStr)
if err != nil {
glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err)
return ErrUnauthorized
}
if !token.Valid {
glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr)
return ErrUnauthorized
} }
glog.V(1).Infof("No permission from %s", r.RemoteAddr) glog.V(1).Infof("No permission from %s", r.RemoteAddr)

72
go/security/jwt.go

@ -0,0 +1,72 @@
package security
import (
"net/http"
"strings"
"time"
"github.com/chrislusf/weed-fs/go/glog"
jwt "github.com/dgrijalva/jwt-go"
)
type EncodedJwt string
type Secret string
func GenJwt(secret Secret, fileId string) EncodedJwt {
if secret == "" {
return ""
}
t := jwt.New(jwt.GetSigningMethod("HS256"))
t.Claims["exp"] = time.Now().Unix() + 10
t.Claims["sub"] = fileId
encoded, e := t.SignedString(secret)
if e != nil {
glog.V(0).Infof("Failed to sign claims: %v", t.Claims)
return ""
}
return EncodedJwt(encoded)
}
func GetJwt(r *http.Request) EncodedJwt {
// Get token from query params
tokenStr := r.URL.Query().Get("jwt")
// Get token from authorization header
if tokenStr == "" {
bearer := r.Header.Get("Authorization")
if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
tokenStr = bearer[7:]
}
}
// Get token from cookie
if tokenStr == "" {
cookie, err := r.Cookie("jwt")
if err == nil {
tokenStr = cookie.Value
}
}
return EncodedJwt(tokenStr)
}
func EncodeJwt(secret Secret, claims map[string]interface{}) (EncodedJwt, error) {
if secret == "" {
return "", nil
}
t := jwt.New(jwt.GetSigningMethod("HS256"))
t.Claims = claims
encoded, e := t.SignedString(secret)
return EncodedJwt(encoded), e
}
func DecodeJwt(secret Secret, tokenString EncodedJwt) (token *jwt.Token, err error) {
// check exp, nbf
return jwt.Parse(string(tokenString), func(token *jwt.Token) (interface{}, error) {
return secret, nil
})
}

14
go/storage/store.go

@ -11,6 +11,7 @@ import (
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
) )
@ -260,7 +261,7 @@ func (s *Store) SetRack(rack string) {
func (s *Store) SetBootstrapMaster(bootstrapMaster string) { func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
s.masterNodes = NewMasterNodes(bootstrapMaster) s.masterNodes = NewMasterNodes(bootstrapMaster)
} }
func (s *Store) Join() (masterNode string, e error) {
func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) {
masterNode, e = s.masterNodes.findMaster() masterNode, e = s.masterNodes.findMaster()
if e != nil { if e != nil {
return return
@ -314,22 +315,23 @@ func (s *Store) Join() (masterNode string, e error) {
data, err := proto.Marshal(joinMessage) data, err := proto.Marshal(joinMessage)
if err != nil { if err != nil {
return "", err
return "", "", err
} }
jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data) jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data)
if err != nil { if err != nil {
s.masterNodes.reset() s.masterNodes.reset()
return "", err
return "", "", err
} }
var ret operation.JoinResult var ret operation.JoinResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil { if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return masterNode, err
return masterNode, "", err
} }
if ret.Error != "" { if ret.Error != "" {
return masterNode, errors.New(ret.Error)
return masterNode, "", errors.New(ret.Error)
} }
s.volumeSizeLimit = ret.VolumeSizeLimit s.volumeSizeLimit = ret.VolumeSizeLimit
secretKey = security.Secret(ret.SecretKey)
s.connected = true s.connected = true
return return
} }
@ -353,7 +355,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
} }
if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
if _, e := s.Join(); e != nil {
if _, _, e := s.Join(); e != nil {
glog.V(0).Infoln("error when reporting size:", e) glog.V(0).Infoln("error when reporting size:", e)
} }
} }

26
go/topology/store_replicate.go

@ -7,11 +7,18 @@ import (
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
) )
func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {
func ReplicatedWrite(masterNode string, s *storage.Store,
volumeId storage.VolumeId, needle *storage.Needle,
r *http.Request) (size uint32, errorStatus string) {
//check JWT
jwt := security.GetJwt(r)
ret, err := s.Write(volumeId, needle) ret, err := s.Write(volumeId, needle)
needToReplicate := !s.HasVolume(volumeId) needToReplicate := !s.HasVolume(volumeId)
if err != nil { if err != nil {
@ -27,7 +34,10 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
if needToReplicate { //send to other replica locations if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
_, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime))
_, err := operation.Upload(
"http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
return err == nil return err == nil
}) { }) {
ret = 0 ret = 0
@ -41,7 +51,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
volumeId.String() + ": " + err.Error() volumeId.String() + ": " + err.Error()
} else { } else {
distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
}) })
} }
} }
@ -49,7 +59,13 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
return return
} }
func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) {
func ReplicatedDelete(masterNode string, store *storage.Store,
volumeId storage.VolumeId, n *storage.Needle,
r *http.Request) (ret uint32) {
//check JWT
jwt := security.GetJwt(r)
ret, err := store.Delete(volumeId, n) ret, err := store.Delete(volumeId, n)
if err != nil { if err != nil {
glog.V(0).Infoln("delete error:", err) glog.V(0).Infoln("delete error:", err)
@ -63,7 +79,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.
if needToReplicate { //send to other replica locations if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
}) { }) {
ret = 0 ret = 0
} }

2
go/util/constants.go

@ -1,5 +1,5 @@
package util package util
const ( const (
VERSION = "0.68"
VERSION = "0.69 beta"
) )

7
go/util/http_util.go

@ -7,6 +7,8 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/security"
) )
var ( var (
@ -63,8 +65,11 @@ func Get(url string) ([]byte, error) {
return b, nil return b, nil
} }
func Delete(url string) error {
func Delete(url string, jwt security.EncodedJwt) error {
req, err := http.NewRequest("DELETE", url, nil) req, err := http.NewRequest("DELETE", url, nil)
if jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(jwt))
}
if err != nil { if err != nil {
return err return err
} }

10
go/weed/benchmark.go

@ -16,6 +16,7 @@ import (
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
) )
@ -32,6 +33,7 @@ type BenchmarkOptions struct {
collection *string collection *string
cpuprofile *string cpuprofile *string
maxCpu *int maxCpu *int
secretKey *string
vid2server map[string]string //cache for vid locations vid2server map[string]string //cache for vid locations
} }
@ -56,6 +58,7 @@ func init() {
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
b.vid2server = make(map[string]string) b.vid2server = make(map[string]string)
sharedBytes = make([]byte, 1024) sharedBytes = make([]byte, 1024)
} }
@ -181,6 +184,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
defer wait.Done() defer wait.Done()
delayedDeleteChan := make(chan *delayedFile, 100) delayedDeleteChan := make(chan *delayedFile, 100)
var waitForDeletions sync.WaitGroup var waitForDeletions sync.WaitGroup
secret := security.Secret(*b.secretKey)
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
waitForDeletions.Add(1) waitForDeletions.Add(1)
go func() { go func() {
@ -189,7 +194,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if df.enterTime.After(time.Now()) { if df.enterTime.After(time.Now()) {
time.Sleep(df.enterTime.Sub(time.Now())) time.Sleep(df.enterTime.Sub(time.Now()))
} }
if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil {
if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid,
security.GenJwt(secret, df.fp.Fid)); e == nil {
s.completed++ s.completed++
} else { } else {
s.failed++ s.failed++
@ -204,7 +210,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
if _, err := fp.Upload(0, *b.server); err == nil {
if _, err := fp.Upload(0, *b.server, secret); err == nil {
if rand.Intn(100) < *b.deletePercentage { if rand.Intn(100) < *b.deletePercentage {
s.total++ s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}

4
go/weed/filer.go

@ -22,6 +22,7 @@ type FilerOptions struct {
defaultReplicaPlacement *string defaultReplicaPlacement *string
dir *string dir *string
redirectOnRead *bool redirectOnRead *bool
secretKey *string
cassandra_server *string cassandra_server *string
cassandra_keyspace *string cassandra_keyspace *string
redis_server *string redis_server *string
@ -40,6 +41,8 @@ func init() {
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server") f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server")
f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
} }
var cmdFiler = &Command{ var cmdFiler = &Command{
@ -73,6 +76,7 @@ func runFiler(cmd *Command, args []string) bool {
r := http.NewServeMux() r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection, _, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead, *f.defaultReplicaPlacement, *f.redirectOnRead,
*f.secretKey,
*f.cassandra_server, *f.cassandra_keyspace, *f.cassandra_server, *f.cassandra_keyspace,
*f.redis_server, *f.redis_database, *f.redis_server, *f.redis_database,
) )

11
go/weed/master.go

@ -29,8 +29,9 @@ var cmdMaster = &Command{
var ( var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port") mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
masterIp = cmdMaster.Flag.String("ip", "", "master listening ip address, default to listen on all network interfaces")
masterIp = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address")
masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
mPublicUrl = cmdMaster.Flag.String("publicUrl", "", "peer accessible <ip>|<server_name>:port")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list") masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
@ -41,7 +42,7 @@ var (
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
masterSecureKey = cmdMaster.Flag.String("secure.key", "", "secret key to check permission")
masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
masterWhiteList []string masterWhiteList []string
) )
@ -75,10 +76,10 @@ func runMaster(cmd *Command, args []string) bool {
go func() { go func() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if *masterIp == "" {
*masterIp = "localhost"
}
myMasterAddress := *masterIp + ":" + strconv.Itoa(*mport) myMasterAddress := *masterIp + ":" + strconv.Itoa(*mport)
if *mPublicUrl != "" {
myMasterAddress = *mPublicUrl
}
var peers []string var peers []string
if *masterPeers != "" { if *masterPeers != "" {
peers = strings.Split(*masterPeers, ",") peers = strings.Split(*masterPeers, ",")

9
go/weed/mount_std.go

@ -13,6 +13,7 @@ import (
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
"golang.org/x/net/context"
) )
func runMount(cmd *Command, args []string) bool { func runMount(cmd *Command, args []string) bool {
@ -55,7 +56,7 @@ type File struct {
func (File) Attr() fuse.Attr { func (File) Attr() fuse.Attr {
return fuse.Attr{Mode: 0444} return fuse.Attr{Mode: 0444}
} }
func (File) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
func (File) ReadAll(ctx context.Context) ([]byte, error) {
return []byte("hello, world\n"), nil return []byte("hello, world\n"), nil
} }
@ -68,7 +69,7 @@ func (dir Dir) Attr() fuse.Attr {
return fuse.Attr{Inode: dir.Id, Mode: os.ModeDir | 0555} return fuse.Attr{Inode: dir.Id, Mode: os.ModeDir | 0555}
} }
func (dir Dir) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
func (dir Dir) Lookup(ctx context.Context, name string) (fs.Node, error) {
files_result, e := filer.ListFiles(*mountOptions.filer, dir.Path, name) files_result, e := filer.ListFiles(*mountOptions.filer, dir.Path, name)
if e != nil { if e != nil {
return nil, fuse.ENOENT return nil, fuse.ENOENT
@ -81,11 +82,11 @@ func (dir Dir) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
type WFS struct{} type WFS struct{}
func (WFS) Root() (fs.Node, fuse.Error) {
func (WFS) Root() (fs.Node, error) {
return Dir{}, nil return Dir{}, nil
} }
func (dir *Dir) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
func (dir *Dir) ReadDir(ctx context.Context) ([]fuse.Dirent, error) {
ret := make([]fuse.Dirent, 0) ret := make([]fuse.Dirent, 0)
if dirs, e := filer.ListDirectories(*mountOptions.filer, dir.Path); e == nil { if dirs, e := filer.ListDirectories(*mountOptions.filer, dir.Path); e == nil {
for _, d := range dirs.Directories { for _, d := range dirs.Directories {

17
go/weed/server.go

@ -47,7 +47,7 @@ var cmdServer = &Command{
} }
var ( var (
serverIp = cmdServer.Flag.String("ip", "", "ip or server name")
serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
serverPublicUrl = cmdServer.Flag.String("publicUrl", "", "publicly accessible address") serverPublicUrl = cmdServer.Flag.String("publicUrl", "", "publicly accessible address")
serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
@ -56,7 +56,7 @@ var (
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list") serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list")
serverSecureKey = cmdServer.Flag.String("secure.key", "", "secret key to ensure authenticated access")
serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
@ -86,10 +86,10 @@ func init() {
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server") filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server")
} }
func runServer(cmd *Command, args []string) bool { func runServer(cmd *Command, args []string) bool {
filerOptions.secretKey = serverSecureKey
if *serverOptions.cpuprofile != "" { if *serverOptions.cpuprofile != "" {
f, err := os.Create(*serverOptions.cpuprofile) f, err := os.Create(*serverOptions.cpuprofile)
if err != nil { if err != nil {
@ -99,10 +99,6 @@ func runServer(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
if *serverIp == "" {
*serverIp = "localhost"
}
if *filerOptions.redirectOnRead { if *filerOptions.redirectOnRead {
*isStartingFiler = true *isStartingFiler = true
} }
@ -145,13 +141,13 @@ func runServer(cmd *Command, args []string) bool {
*filerOptions.dir = *masterMetaFolder + "/filer" *filerOptions.dir = *masterMetaFolder + "/filer"
os.MkdirAll(*filerOptions.dir, 0700) os.MkdirAll(*filerOptions.dir, 0700)
} }
if err := util.TestFolderWritable(*filerOptions.dir); err != nil {
glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err)
}
} }
if err := util.TestFolderWritable(*masterMetaFolder); err != nil { if err := util.TestFolderWritable(*masterMetaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err)
} }
if err := util.TestFolderWritable(*filerOptions.dir); err != nil {
glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err)
}
if *serverWhiteListOption != "" { if *serverWhiteListOption != "" {
serverWhiteList = strings.Split(*serverWhiteListOption, ",") serverWhiteList = strings.Split(*serverWhiteListOption, ",")
@ -162,6 +158,7 @@ func runServer(cmd *Command, args []string) bool {
r := http.NewServeMux() r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, _, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
*filerOptions.secretKey,
"", "", "", "",
"", 0, "", 0,
) )

22
go/weed/upload.go

@ -7,6 +7,7 @@ import (
"path/filepath" "path/filepath"
"github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/security"
) )
var ( var (
@ -15,6 +16,7 @@ var (
uploadDir *string uploadDir *string
uploadTtl *string uploadTtl *string
include *string include *string
uploadSecretKey *string
maxMB *int maxMB *int
) )
@ -28,13 +30,14 @@ func init() {
uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name") uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name")
uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
uploadSecretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
} }
var cmdUpload = &Command{ var cmdUpload = &Command{
UsageLine: "upload -server=localhost:9333 file1 [file2 file3]\n upload -server=localhost:9333 -dir=one_directory -include=*.pdf", UsageLine: "upload -server=localhost:9333 file1 [file2 file3]\n upload -server=localhost:9333 -dir=one_directory -include=*.pdf",
Short: "upload one or a list of files", Short: "upload one or a list of files",
Long: `upload one or a list of files, or batch upload one whole folder recursively. Long: `upload one or a list of files, or batch upload one whole folder recursively.
If uploading a list of files: If uploading a list of files:
It uses consecutive file keys for the list of files. It uses consecutive file keys for the list of files.
e.g. If the file1 uses key k, file2 can be read via k_1 e.g. If the file1 uses key k, file2 can be read via k_1
@ -42,18 +45,19 @@ var cmdUpload = &Command{
If uploading a whole folder recursively: If uploading a whole folder recursively:
All files under the folder and subfolders will be uploaded, each with its own file key. All files under the folder and subfolders will be uploaded, each with its own file key.
Optional parameter "-include" allows you to specify the file name patterns. Optional parameter "-include" allows you to specify the file name patterns.
If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is. If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is.
This can save volume server's gzipped processing and allow customizable gzip compression level. This can save volume server's gzipped processing and allow customizable gzip compression level.
The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js". The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js".
If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly.
The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned.
If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly.
The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned.
`, `,
} }
func runUpload(cmd *Command, args []string) bool { func runUpload(cmd *Command, args []string) bool {
secret := security.Secret(*uploadSecretKey)
if len(cmdUpload.Flag.Args()) == 0 { if len(cmdUpload.Flag.Args()) == 0 {
if *uploadDir == "" { if *uploadDir == "" {
return false return false
@ -70,7 +74,9 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil { if e != nil {
return e return e
} }
results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
results, e := operation.SubmitFiles(*server, parts,
*uploadReplication, *uploadCollection,
*uploadTtl, *maxMB, secret)
bytes, _ := json.Marshal(results) bytes, _ := json.Marshal(results)
fmt.Println(string(bytes)) fmt.Println(string(bytes))
if e != nil { if e != nil {
@ -87,7 +93,9 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil { if e != nil {
fmt.Println(e.Error()) fmt.Println(e.Error())
} }
results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
results, _ := operation.SubmitFiles(*server, parts,
*uploadReplication, *uploadCollection,
*uploadTtl, *maxMB, secret)
bytes, _ := json.Marshal(results) bytes, _ := json.Marshal(results)
fmt.Println(string(bytes)) fmt.Println(string(bytes))
} }

4
go/weed/weed_server/common.go

@ -12,6 +12,7 @@ import (
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/stats" "github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
@ -75,6 +76,7 @@ func debug(params ...interface{}) {
} }
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
jwt := security.GetJwt(r)
m := make(map[string]interface{}) m := make(map[string]interface{})
if r.Method != "POST" { if r.Method != "POST" {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@ -102,7 +104,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
} }
debug("upload file to store", url) debug("upload file to store", url)
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType)
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return return

7
go/weed/weed_server/filer_server.go

@ -10,6 +10,7 @@ import (
"github.com/chrislusf/weed-fs/go/filer/flat_namespace" "github.com/chrislusf/weed-fs/go/filer/flat_namespace"
"github.com/chrislusf/weed-fs/go/filer/redis_store" "github.com/chrislusf/weed-fs/go/filer/redis_store"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/security"
) )
type FilerServer struct { type FilerServer struct {
@ -18,11 +19,13 @@ type FilerServer struct {
collection string collection string
defaultReplication string defaultReplication string
redirectOnRead bool redirectOnRead bool
secret security.Secret
filer filer.Filer filer filer.Filer
} }
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string, func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
replication string, redirectOnRead bool, replication string, redirectOnRead bool,
secret string,
cassandra_server string, cassandra_keyspace string, cassandra_server string, cassandra_keyspace string,
redis_server string, redis_database int, redis_server string, redis_database int,
) (fs *FilerServer, err error) { ) (fs *FilerServer, err error) {
@ -56,3 +59,7 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
return fs, nil return fs, nil
} }
func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
return security.GenJwt(fs.secret, fileId)
}

6
go/weed/weed_server/filer_server_handlers.go

@ -170,7 +170,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if ret.Name != "" { if ret.Name != "" {
path += ret.Name path += ret.Name
} else { } else {
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
writeJsonError(w, r, http.StatusInternalServerError, writeJsonError(w, r, http.StatusInternalServerError,
errors.New("Can not to write to folder "+path+" without a file name")) errors.New("Can not to write to folder "+path+" without a file name"))
@ -179,7 +179,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err)
return return
@ -199,7 +199,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} else { } else {
fid, err = fs.filer.DeleteFile(r.URL.Path) fid, err = fs.filer.DeleteFile(r.URL.Path)
if err == nil { if err == nil {
err = operation.DeleteFile(fs.master, fid)
err = operation.DeleteFile(fs.master, fid, fs.jwt(fid))
} }
} }
if err == nil { if err == nil {

29
go/weed/weed_server/master_server.go

@ -23,6 +23,7 @@ type MasterServer struct {
pulseSeconds int pulseSeconds int
defaultReplicaPlacement string defaultReplicaPlacement string
garbageThreshold string garbageThreshold string
guard *security.Guard
Topo *topology.Topology Topo *topology.Topology
vg *topology.VolumeGrowth vg *topology.VolumeGrowth
@ -57,22 +58,22 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
ms.vg = topology.NewDefaultVolumeGrowth() ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
guard := security.NewGuard(whiteList, secureKey)
ms.guard = security.NewGuard(whiteList, secureKey)
r.HandleFunc("/dir/assign", ms.proxyToLeader(guard.Secure(ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.proxyToLeader(guard.Secure(ms.dirLookupHandler)))
r.HandleFunc("/dir/join", ms.proxyToLeader(guard.Secure(ms.dirJoinHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(guard.Secure(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(guard.Secure(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/lookup", ms.proxyToLeader(guard.Secure(ms.volumeLookupHandler)))
r.HandleFunc("/vol/grow", ms.proxyToLeader(guard.Secure(ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(guard.Secure(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(guard.Secure(ms.volumeVacuumHandler)))
r.HandleFunc("/submit", guard.Secure(ms.submitFromMasterServerHandler))
r.HandleFunc("/delete", guard.Secure(ms.deleteFromMasterServerHandler))
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler)))
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler))
r.HandleFunc("/{fileId}", ms.redirectHandler) r.HandleFunc("/{fileId}", ms.redirectHandler)
r.HandleFunc("/stats/counter", guard.Secure(statsCounterHandler))
r.HandleFunc("/stats/memory", guard.Secure(statsMemoryHandler))
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
ms.Topo.StartRefreshWritableVolumes(garbageThreshold) ms.Topo.StartRefreshWritableVolumes(garbageThreshold)

5
go/weed/weed_server/master_server_handlers_admin.go

@ -58,7 +58,10 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
} }
ms.Topo.ProcessJoinMessage(joinMessage) ms.Topo.ProcessJoinMessage(joinMessage)
writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
SecretKey: string(ms.guard.SecretKey),
})
} }
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {

48
go/weed/weed_server/volume_server.go

@ -3,6 +3,7 @@ package weed_server
import ( import (
"math/rand" "math/rand"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
@ -12,6 +13,7 @@ import (
type VolumeServer struct { type VolumeServer struct {
masterNode string masterNode string
mnLock sync.RWMutex
pulseSeconds int pulseSeconds int
dataCenter string dataCenter string
rack string rack string
@ -29,40 +31,42 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
whiteList []string, whiteList []string,
fixJpgOrientation bool) *VolumeServer { fixJpgOrientation bool) *VolumeServer {
vs := &VolumeServer{ vs := &VolumeServer{
masterNode: masterNode,
pulseSeconds: pulseSeconds, pulseSeconds: pulseSeconds,
dataCenter: dataCenter, dataCenter: dataCenter,
rack: rack, rack: rack,
FixJpgOrientation: fixJpgOrientation, FixJpgOrientation: fixJpgOrientation,
} }
vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, adminPort, ip, publicUrl, folders, maxCounts) vs.store = storage.NewStore(port, adminPort, ip, publicUrl, folders, maxCounts)
vs.guard = security.NewGuard(whiteList, "") vs.guard = security.NewGuard(whiteList, "")
adminMux.HandleFunc("/status", vs.guard.Secure(vs.statusHandler))
adminMux.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler))
adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler))
adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler))
adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler))
adminMux.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler))
adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler))
adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler))
adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler))
adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler))
adminMux.HandleFunc("/admin/freeze_volume", vs.guard.WhiteList(vs.freezeVolumeHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler)) publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler))
publicMux.HandleFunc("/", vs.storeHandler) publicMux.HandleFunc("/", vs.storeHandler)
go func() { go func() {
connected := true connected := true
vs.store.SetBootstrapMaster(vs.masterNode)
vs.store.SetBootstrapMaster(vs.GetMasterNode())
vs.store.SetDataCenter(vs.dataCenter) vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack) vs.store.SetRack(vs.rack)
for { for {
master, err := vs.store.Join()
master, secretKey, err := vs.store.Join()
if err == nil { if err == nil {
if !connected { if !connected {
connected = true connected = true
vs.masterNode = master
vs.SetMasterNode(master)
vs.guard.SecretKey = secretKey
glog.V(0).Infoln("Volume Server Connected with master at", master) glog.V(0).Infoln("Volume Server Connected with master at", master)
} }
} else { } else {
@ -82,8 +86,24 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
return vs return vs
} }
func (vs *VolumeServer) GetMasterNode() string {
vs.mnLock.RLock()
defer vs.mnLock.RUnlock()
return vs.masterNode
}
func (vs *VolumeServer) SetMasterNode(masterNode string) {
vs.mnLock.Lock()
defer vs.mnLock.Unlock()
vs.masterNode = masterNode
}
func (vs *VolumeServer) Shutdown() { func (vs *VolumeServer) Shutdown() {
glog.V(0).Infoln("Shutting down volume server...") glog.V(0).Infoln("Shutting down volume server...")
vs.store.Close() vs.store.Close()
glog.V(0).Infoln("Shut down successfully!") glog.V(0).Infoln("Shut down successfully!")
} }
func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt {
return security.GenJwt(vs.guard.SecretKey, fileId)
}

7
go/weed/weed_server/volume_server_handlers.go

@ -58,7 +58,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
glog.V(4).Infoln("volume", volumeId, "reading", n) glog.V(4).Infoln("volume", volumeId, "reading", n)
if !vs.store.HasVolume(volumeId) { if !vs.store.HasVolume(volumeId) {
lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String())
lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 { if err == nil && len(lookupResult.Locations) > 0 {
http.Redirect(w, r, "http://"+lookupResult.Locations[0].Url+r.URL.Path, http.StatusMovedPermanently) http.Redirect(w, r, "http://"+lookupResult.Locations[0].Url+r.URL.Path, http.StatusMovedPermanently)
@ -253,7 +253,8 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
ret := operation.UploadResult{} ret := operation.UploadResult{}
size, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(),
vs.store, volumeId, needle, r)
httpStatus := http.StatusCreated httpStatus := http.StatusCreated
if errorStatus != "" { if errorStatus != "" {
httpStatus = http.StatusInternalServerError httpStatus = http.StatusInternalServerError
@ -290,7 +291,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} }
n.Size = 0 n.Size = 0
ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
ret := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r)
if ret != 0 { if ret != 0 {
m := make(map[string]uint32) m := make(map[string]uint32)

Loading…
Cancel
Save