diff --git a/docs/replication.rst b/docs/replication.rst index 54e4b2503..920fdbca0 100644 --- a/docs/replication.rst +++ b/docs/replication.rst @@ -10,7 +10,7 @@ Basically, the way it works is: .. code-block:: bash - ./weed master -defaultReplicationType=001 + ./weed master -defaultReplication=001 2. start volume servers as this: diff --git a/go/operation/data_struts.go b/go/operation/data_struts.go index 4980f9913..bfc53aa50 100644 --- a/go/operation/data_struts.go +++ b/go/operation/data_struts.go @@ -2,5 +2,6 @@ package operation type JoinResult struct { VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` + SecretKey string `json:"secretKey,omitempty"` Error string `json:"error,omitempty"` } diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go index 064cc8df9..afd1bbc34 100644 --- a/go/operation/delete_content.go +++ b/go/operation/delete_content.go @@ -7,6 +7,7 @@ import ( "strings" "sync" + "github.com/chrislusf/weed-fs/go/security" "github.com/chrislusf/weed-fs/go/util" ) @@ -16,12 +17,12 @@ type DeleteResult struct { Error string `json:"error,omitempty"` } -func DeleteFile(master string, fileId string) error { +func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { fileUrl, err := LookupFileId(master, fileId) if err != nil { return err } - return util.Delete(fileUrl) + return util.Delete(fileUrl, jwt) } func ParseFileId(fid string) (vid string, key_cookie string, err error) { diff --git a/go/operation/submit.go b/go/operation/submit.go index 3ab6d78d9..03551b1e8 100644 --- a/go/operation/submit.go +++ b/go/operation/submit.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/security" ) type FilePart struct { @@ -34,7 +35,10 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) { +func SubmitFiles(master string, files []FilePart, + replication string, collection string, ttl string, maxMB int, + secret security.Secret, +) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName @@ -54,7 +58,7 @@ func SubmitFiles(master string, files []FilePart, replication string, collection file.Server = ret.PublicUrl file.Replication = replication file.Collection = collection - results[index].Size, err = file.Upload(maxMB, master) + results[index].Size, err = file.Upload(maxMB, master, secret) if err != nil { results[index].Error = err.Error() } @@ -101,7 +105,8 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) { +func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) { + jwt := security.GenJwt(secret, fi.Fid) fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -114,16 +119,20 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) chunks := fi.FileSize/chunkSize + 1 fids := make([]string, 0) for i := int64(0); i < chunks; i++ { - id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl) + id, count, e := upload_one_chunk( + fi.FileName+"-"+strconv.FormatInt(i+1, 10), + io.LimitReader(fi.Reader, chunkSize), + master, fi.Replication, fi.Collection, fi.Ttl, + jwt) if e != nil { return 0, e } fids = append(fids, id) retSize += count } - err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids) + err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids, jwt) } else { - ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType) + ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt) if e != nil { return 0, e } @@ -132,24 +141,27 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) return } -func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) { +func upload_one_chunk(filename string, reader io.Reader, master, + replication string, collection string, ttl string, jwt security.EncodedJwt, +) (fid string, size uint32, e error) { ret, err := Assign(master, 1, replication, collection, ttl) if err != nil { return "", 0, err } fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") - uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream") + uploadResult, uploadError := Upload(fileUrl, filename, reader, false, + "application/octet-stream", jwt) if uploadError != nil { return fid, 0, uploadError } return fid, uploadResult.Size, nil } -func upload_file_id_list(fileUrl, filename string, fids []string) error { +func upload_file_id_list(fileUrl, filename string, fids []string, jwt security.EncodedJwt) error { var buf bytes.Buffer buf.WriteString(strings.Join(fids, "\n")) glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...") - _, e := Upload(fileUrl, filename, &buf, false, "text/plain") + _, e := Upload(fileUrl, filename, &buf, false, "text/plain", jwt) return e } diff --git a/go/operation/upload_content.go b/go/operation/upload_content.go index 480d76dca..533be82cb 100644 --- a/go/operation/upload_content.go +++ b/go/operation/upload_content.go @@ -15,6 +15,7 @@ import ( "strings" "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/security" ) type UploadResult struct { @@ -35,13 +36,13 @@ func init() { var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") -func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string) (*UploadResult, error) { +func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) { return upload_content(uploadUrl, func(w io.Writer) (err error) { _, err = io.Copy(w, reader) return - }, filename, isGzipped, mtype) + }, filename, isGzipped, mtype, jwt) } -func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string) (*UploadResult, error) { +func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) { body_buf := bytes.NewBufferString("") body_writer := multipart.NewWriter(body_buf) h := make(textproto.MIMEHeader) @@ -55,6 +56,9 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error if isGzipped { h.Set("Content-Encoding", "gzip") } + if jwt != "" { + h.Set("Authorization", "BEARER "+string(jwt)) + } file_writer, cp_err := body_writer.CreatePart(h) if cp_err != nil { glog.V(0).Infoln("error creating form file", cp_err.Error()) diff --git a/go/security/guard.go b/go/security/guard.go index a2beb48f4..d39985034 100644 --- a/go/security/guard.go +++ b/go/security/guard.go @@ -5,11 +5,8 @@ import ( "fmt" "net" "net/http" - "strings" - "time" "github.com/chrislusf/weed-fs/go/glog" - "github.com/dgrijalva/jwt-go" ) var ( @@ -44,24 +41,24 @@ https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go */ type Guard struct { whiteList []string - secretKey string + SecretKey Secret isActive bool } func NewGuard(whiteList []string, secretKey string) *Guard { - g := &Guard{whiteList: whiteList, secretKey: secretKey} - g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0 + g := &Guard{whiteList: whiteList, SecretKey: Secret(secretKey)} + g.isActive = len(g.whiteList) != 0 || len(g.SecretKey) != 0 return g } -func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { +func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { if !g.isActive { //if no security needed, just skip all checkings return f } return func(w http.ResponseWriter, r *http.Request) { - if err := g.doCheck(w, r); err != nil { + if err := g.checkWhiteList(w, r); err != nil { w.WriteHeader(http.StatusUnauthorized) return } @@ -69,76 +66,62 @@ func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w ht } } -func (g *Guard) NewToken() (tokenString string, err error) { - m := make(map[string]interface{}) - m["exp"] = time.Now().Unix() + 10 - return g.Encode(m) -} - -func (g *Guard) Encode(claims map[string]interface{}) (tokenString string, err error) { +func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { if !g.isActive { - return "", nil + //if no security needed, just skip all checkings + return f + } + return func(w http.ResponseWriter, r *http.Request) { + if err := g.checkJwt(w, r); err != nil { + w.WriteHeader(http.StatusUnauthorized) + return + } + f(w, r) } - - t := jwt.New(jwt.GetSigningMethod("HS256")) - t.Claims = claims - return t.SignedString(g.secretKey) } -func (g *Guard) Decode(tokenString string) (token *jwt.Token, err error) { - return jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) { - return g.secretKey, nil - }) -} +func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error { + if len(g.whiteList) == 0 { + return nil + } -func (g *Guard) doCheck(w http.ResponseWriter, r *http.Request) error { - if len(g.whiteList) != 0 { - host, _, err := net.SplitHostPort(r.RemoteAddr) - if err == nil { - for _, ip := range g.whiteList { - if ip == host { - return nil - } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err == nil { + for _, ip := range g.whiteList { + if ip == host { + return nil } } } - if len(g.secretKey) != 0 { - - // Get token from query params - tokenStr := r.URL.Query().Get("jwt") + glog.V(1).Infof("Not in whitelist: %s", r.RemoteAddr) + return fmt.Errorf("Not in whitelis: %s", r.RemoteAddr) +} - // Get token from authorization header - if tokenStr == "" { - bearer := r.Header.Get("Authorization") - if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" { - tokenStr = bearer[7:] - } - } +func (g *Guard) checkJwt(w http.ResponseWriter, r *http.Request) error { + if g.checkWhiteList(w, r) == nil { + return nil + } - // Get token from cookie - if tokenStr == "" { - cookie, err := r.Cookie("jwt") - if err == nil { - tokenStr = cookie.Value - } - } + if len(g.SecretKey) == 0 { + return nil + } - if tokenStr == "" { - return ErrUnauthorized - } + tokenStr := GetJwt(r) - // Verify the token - token, err := g.Decode(tokenStr) - if err != nil { - glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err) - return ErrUnauthorized - } - if !token.Valid { - glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr) - return ErrUnauthorized - } + if tokenStr == "" { + return ErrUnauthorized + } + // Verify the token + token, err := DecodeJwt(g.SecretKey, tokenStr) + if err != nil { + glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err) + return ErrUnauthorized + } + if !token.Valid { + glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr) + return ErrUnauthorized } glog.V(1).Infof("No permission from %s", r.RemoteAddr) diff --git a/go/security/jwt.go b/go/security/jwt.go new file mode 100644 index 000000000..fac91dd8e --- /dev/null +++ b/go/security/jwt.go @@ -0,0 +1,72 @@ +package security + +import ( + "net/http" + "strings" + + "time" + + "github.com/chrislusf/weed-fs/go/glog" + jwt "github.com/dgrijalva/jwt-go" +) + +type EncodedJwt string +type Secret string + +func GenJwt(secret Secret, fileId string) EncodedJwt { + if secret == "" { + return "" + } + + t := jwt.New(jwt.GetSigningMethod("HS256")) + t.Claims["exp"] = time.Now().Unix() + 10 + t.Claims["sub"] = fileId + encoded, e := t.SignedString(secret) + if e != nil { + glog.V(0).Infof("Failed to sign claims: %v", t.Claims) + return "" + } + return EncodedJwt(encoded) +} + +func GetJwt(r *http.Request) EncodedJwt { + + // Get token from query params + tokenStr := r.URL.Query().Get("jwt") + + // Get token from authorization header + if tokenStr == "" { + bearer := r.Header.Get("Authorization") + if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" { + tokenStr = bearer[7:] + } + } + + // Get token from cookie + if tokenStr == "" { + cookie, err := r.Cookie("jwt") + if err == nil { + tokenStr = cookie.Value + } + } + + return EncodedJwt(tokenStr) +} + +func EncodeJwt(secret Secret, claims map[string]interface{}) (EncodedJwt, error) { + if secret == "" { + return "", nil + } + + t := jwt.New(jwt.GetSigningMethod("HS256")) + t.Claims = claims + encoded, e := t.SignedString(secret) + return EncodedJwt(encoded), e +} + +func DecodeJwt(secret Secret, tokenString EncodedJwt) (token *jwt.Token, err error) { + // check exp, nbf + return jwt.Parse(string(tokenString), func(token *jwt.Token) (interface{}, error) { + return secret, nil + }) +} diff --git a/go/storage/store.go b/go/storage/store.go index 2c4434b81..d280175f2 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/security" "github.com/chrislusf/weed-fs/go/util" "github.com/golang/protobuf/proto" ) @@ -260,7 +261,7 @@ func (s *Store) SetRack(rack string) { func (s *Store) SetBootstrapMaster(bootstrapMaster string) { s.masterNodes = NewMasterNodes(bootstrapMaster) } -func (s *Store) Join() (masterNode string, e error) { +func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) { masterNode, e = s.masterNodes.findMaster() if e != nil { return @@ -314,22 +315,23 @@ func (s *Store) Join() (masterNode string, e error) { data, err := proto.Marshal(joinMessage) if err != nil { - return "", err + return "", "", err } jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data) if err != nil { s.masterNodes.reset() - return "", err + return "", "", err } var ret operation.JoinResult if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return masterNode, err + return masterNode, "", err } if ret.Error != "" { - return masterNode, errors.New(ret.Error) + return masterNode, "", errors.New(ret.Error) } s.volumeSizeLimit = ret.VolumeSizeLimit + secretKey = security.Secret(ret.SecretKey) s.connected = true return } @@ -353,7 +355,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) - if _, e := s.Join(); e != nil { + if _, _, e := s.Join(); e != nil { glog.V(0).Infoln("error when reporting size:", e) } } diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go index 0c52f9d30..da426e587 100644 --- a/go/topology/store_replicate.go +++ b/go/topology/store_replicate.go @@ -7,11 +7,18 @@ import ( "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/security" "github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/util" ) -func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) { +func ReplicatedWrite(masterNode string, s *storage.Store, + volumeId storage.VolumeId, needle *storage.Needle, + r *http.Request) (size uint32, errorStatus string) { + + //check JWT + jwt := security.GetJwt(r) + ret, err := s.Write(volumeId, needle) needToReplicate := !s.HasVolume(volumeId) if err != nil { @@ -27,7 +34,10 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum if needToReplicate { //send to other replica locations if r.FormValue("type") != "replicate" { if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { - _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime)) + _, err := operation.Upload( + "http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), + string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), + jwt) return err == nil }) { ret = 0 @@ -41,7 +51,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum volumeId.String() + ": " + err.Error() } else { distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { - return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate") + return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) }) } } @@ -49,7 +59,13 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum return } -func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) { +func ReplicatedDelete(masterNode string, store *storage.Store, + volumeId storage.VolumeId, n *storage.Needle, + r *http.Request) (ret uint32) { + + //check JWT + jwt := security.GetJwt(r) + ret, err := store.Delete(volumeId, n) if err != nil { glog.V(0).Infoln("delete error:", err) @@ -63,7 +79,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage. if needToReplicate { //send to other replica locations if r.FormValue("type") != "replicate" { if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { - return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate") + return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) }) { ret = 0 } diff --git a/go/util/constants.go b/go/util/constants.go index 1aa59d634..ca801574c 100644 --- a/go/util/constants.go +++ b/go/util/constants.go @@ -1,5 +1,5 @@ package util const ( - VERSION = "0.68" + VERSION = "0.69 beta" ) diff --git a/go/util/http_util.go b/go/util/http_util.go index 08de56ba9..72cab76e1 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -7,6 +7,8 @@ import ( "net/http" "net/url" "strings" + + "github.com/chrislusf/weed-fs/go/security" ) var ( @@ -63,8 +65,11 @@ func Get(url string) ([]byte, error) { return b, nil } -func Delete(url string) error { +func Delete(url string, jwt security.EncodedJwt) error { req, err := http.NewRequest("DELETE", url, nil) + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } if err != nil { return err } diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index 04ab4307d..5a91d9d58 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -16,6 +16,7 @@ import ( "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/security" "github.com/chrislusf/weed-fs/go/util" ) @@ -32,6 +33,7 @@ type BenchmarkOptions struct { collection *string cpuprofile *string maxCpu *int + secretKey *string vid2server map[string]string //cache for vid locations } @@ -56,6 +58,7 @@ func init() { b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") b.vid2server = make(map[string]string) sharedBytes = make([]byte, 1024) } @@ -181,6 +184,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { defer wait.Done() delayedDeleteChan := make(chan *delayedFile, 100) var waitForDeletions sync.WaitGroup + secret := security.Secret(*b.secretKey) + for i := 0; i < 7; i++ { waitForDeletions.Add(1) go func() { @@ -189,7 +194,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if df.enterTime.After(time.Now()) { time.Sleep(df.enterTime.Sub(time.Now())) } - if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil { + if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid, + security.GenJwt(secret, df.fp.Fid)); e == nil { s.completed++ } else { s.failed++ @@ -204,7 +210,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection - if _, err := fp.Upload(0, *b.server); err == nil { + if _, err := fp.Upload(0, *b.server, secret); err == nil { if rand.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} diff --git a/go/weed/filer.go b/go/weed/filer.go index 4e7191e34..fd7dcdf88 100644 --- a/go/weed/filer.go +++ b/go/weed/filer.go @@ -22,6 +22,7 @@ type FilerOptions struct { defaultReplicaPlacement *string dir *string redirectOnRead *bool + secretKey *string cassandra_server *string cassandra_keyspace *string redis_server *string @@ -40,6 +41,8 @@ func init() { f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server") + f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + } var cmdFiler = &Command{ @@ -73,6 +76,7 @@ func runFiler(cmd *Command, args []string) bool { r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, + *f.secretKey, *f.cassandra_server, *f.cassandra_keyspace, *f.redis_server, *f.redis_database, ) diff --git a/go/weed/master.go b/go/weed/master.go index af63d8c22..1e2a6f0af 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -29,8 +29,9 @@ var cmdMaster = &Command{ var ( mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - masterIp = cmdMaster.Flag.String("ip", "", "master listening ip address, default to listen on all network interfaces") + masterIp = cmdMaster.Flag.String("ip", "localhost", "master | address") masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + mPublicUrl = cmdMaster.Flag.String("publicUrl", "", "peer accessible |:port") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list") volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") @@ -41,7 +42,7 @@ var ( mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - masterSecureKey = cmdMaster.Flag.String("secure.key", "", "secret key to check permission") + masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") masterWhiteList []string ) @@ -75,10 +76,10 @@ func runMaster(cmd *Command, args []string) bool { go func() { time.Sleep(100 * time.Millisecond) - if *masterIp == "" { - *masterIp = "localhost" - } myMasterAddress := *masterIp + ":" + strconv.Itoa(*mport) + if *mPublicUrl != "" { + myMasterAddress = *mPublicUrl + } var peers []string if *masterPeers != "" { peers = strings.Split(*masterPeers, ",") diff --git a/go/weed/mount_std.go b/go/weed/mount_std.go index 808c6c563..8b5ffefcb 100644 --- a/go/weed/mount_std.go +++ b/go/weed/mount_std.go @@ -13,6 +13,7 @@ import ( "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/util" + "golang.org/x/net/context" ) func runMount(cmd *Command, args []string) bool { @@ -55,7 +56,7 @@ type File struct { func (File) Attr() fuse.Attr { return fuse.Attr{Mode: 0444} } -func (File) ReadAll(intr fs.Intr) ([]byte, fuse.Error) { +func (File) ReadAll(ctx context.Context) ([]byte, error) { return []byte("hello, world\n"), nil } @@ -68,7 +69,7 @@ func (dir Dir) Attr() fuse.Attr { return fuse.Attr{Inode: dir.Id, Mode: os.ModeDir | 0555} } -func (dir Dir) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) { +func (dir Dir) Lookup(ctx context.Context, name string) (fs.Node, error) { files_result, e := filer.ListFiles(*mountOptions.filer, dir.Path, name) if e != nil { return nil, fuse.ENOENT @@ -81,11 +82,11 @@ func (dir Dir) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) { type WFS struct{} -func (WFS) Root() (fs.Node, fuse.Error) { +func (WFS) Root() (fs.Node, error) { return Dir{}, nil } -func (dir *Dir) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) { +func (dir *Dir) ReadDir(ctx context.Context) ([]fuse.Dirent, error) { ret := make([]fuse.Dirent, 0) if dirs, e := filer.ListDirectories(*mountOptions.filer, dir.Path); e == nil { for _, d := range dirs.Directories { diff --git a/go/weed/server.go b/go/weed/server.go index b779033cb..a758f887f 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -47,7 +47,7 @@ var cmdServer = &Command{ } var ( - serverIp = cmdServer.Flag.String("ip", "", "ip or server name") + serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") serverPublicUrl = cmdServer.Flag.String("publicUrl", "", "publicly accessible address") serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") @@ -56,7 +56,7 @@ var ( serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list") - serverSecureKey = cmdServer.Flag.String("secure.key", "", "secret key to ensure authenticated access") + serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") @@ -86,10 +86,10 @@ func init() { filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server") - } func runServer(cmd *Command, args []string) bool { + filerOptions.secretKey = serverSecureKey if *serverOptions.cpuprofile != "" { f, err := os.Create(*serverOptions.cpuprofile) if err != nil { @@ -99,10 +99,6 @@ func runServer(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - if *serverIp == "" { - *serverIp = "localhost" - } - if *filerOptions.redirectOnRead { *isStartingFiler = true } @@ -145,13 +141,13 @@ func runServer(cmd *Command, args []string) bool { *filerOptions.dir = *masterMetaFolder + "/filer" os.MkdirAll(*filerOptions.dir, 0700) } + if err := util.TestFolderWritable(*filerOptions.dir); err != nil { + glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err) + } } if err := util.TestFolderWritable(*masterMetaFolder); err != nil { glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) } - if err := util.TestFolderWritable(*filerOptions.dir); err != nil { - glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err) - } if *serverWhiteListOption != "" { serverWhiteList = strings.Split(*serverWhiteListOption, ",") @@ -162,6 +158,7 @@ func runServer(cmd *Command, args []string) bool { r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, + *filerOptions.secretKey, "", "", "", 0, ) diff --git a/go/weed/upload.go b/go/weed/upload.go index 2d67c0bd9..eff259d1f 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -7,6 +7,7 @@ import ( "path/filepath" "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/security" ) var ( @@ -15,6 +16,7 @@ var ( uploadDir *string uploadTtl *string include *string + uploadSecretKey *string maxMB *int ) @@ -28,13 +30,14 @@ func init() { uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name") uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") + uploadSecretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") } var cmdUpload = &Command{ UsageLine: "upload -server=localhost:9333 file1 [file2 file3]\n upload -server=localhost:9333 -dir=one_directory -include=*.pdf", Short: "upload one or a list of files", Long: `upload one or a list of files, or batch upload one whole folder recursively. - + If uploading a list of files: It uses consecutive file keys for the list of files. e.g. If the file1 uses key k, file2 can be read via k_1 @@ -42,18 +45,19 @@ var cmdUpload = &Command{ If uploading a whole folder recursively: All files under the folder and subfolders will be uploaded, each with its own file key. Optional parameter "-include" allows you to specify the file name patterns. - + If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is. This can save volume server's gzipped processing and allow customizable gzip compression level. The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js". - - If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly. - The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned. + + If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly. + The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned. `, } func runUpload(cmd *Command, args []string) bool { + secret := security.Secret(*uploadSecretKey) if len(cmdUpload.Flag.Args()) == 0 { if *uploadDir == "" { return false @@ -70,7 +74,9 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB) + results, e := operation.SubmitFiles(*server, parts, + *uploadReplication, *uploadCollection, + *uploadTtl, *maxMB, secret) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -87,7 +93,9 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB) + results, _ := operation.SubmitFiles(*server, parts, + *uploadReplication, *uploadCollection, + *uploadTtl, *maxMB, secret) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index a2d93c246..095652a6b 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/security" "github.com/chrislusf/weed-fs/go/stats" "github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/util" @@ -75,6 +76,7 @@ func debug(params ...interface{}) { } func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { + jwt := security.GetJwt(r) m := make(map[string]interface{}) if r.Method != "POST" { writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) @@ -102,7 +104,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("upload file to store", url) - uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType) + uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go index b43e1965b..1309e4486 100644 --- a/go/weed/weed_server/filer_server.go +++ b/go/weed/weed_server/filer_server.go @@ -10,6 +10,7 @@ import ( "github.com/chrislusf/weed-fs/go/filer/flat_namespace" "github.com/chrislusf/weed-fs/go/filer/redis_store" "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/security" ) type FilerServer struct { @@ -18,11 +19,13 @@ type FilerServer struct { collection string defaultReplication string redirectOnRead bool + secret security.Secret filer filer.Filer } func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string, replication string, redirectOnRead bool, + secret string, cassandra_server string, cassandra_keyspace string, redis_server string, redis_database int, ) (fs *FilerServer, err error) { @@ -56,3 +59,7 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle return fs, nil } + +func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { + return security.GenJwt(fs.secret, fileId) +} diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index ac894771a..6278e5dad 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -170,7 +170,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if ret.Name != "" { path += ret.Name } else { - operation.DeleteFile(fs.master, assignResult.Fid) //clean up + operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") writeJsonError(w, r, http.StatusInternalServerError, errors.New("Can not to write to folder "+path+" without a file name")) @@ -179,7 +179,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { - operation.DeleteFile(fs.master, assignResult.Fid) //clean up + operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) return @@ -199,7 +199,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } else { fid, err = fs.filer.DeleteFile(r.URL.Path) if err == nil { - err = operation.DeleteFile(fs.master, fid) + err = operation.DeleteFile(fs.master, fid, fs.jwt(fid)) } } if err == nil { diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 056b1fe7b..dc79c733a 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -23,6 +23,7 @@ type MasterServer struct { pulseSeconds int defaultReplicaPlacement string garbageThreshold string + guard *security.Guard Topo *topology.Topology vg *topology.VolumeGrowth @@ -57,22 +58,22 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - guard := security.NewGuard(whiteList, secureKey) + ms.guard = security.NewGuard(whiteList, secureKey) - r.HandleFunc("/dir/assign", ms.proxyToLeader(guard.Secure(ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(guard.Secure(ms.dirLookupHandler))) - r.HandleFunc("/dir/join", ms.proxyToLeader(guard.Secure(ms.dirJoinHandler))) - r.HandleFunc("/dir/status", ms.proxyToLeader(guard.Secure(ms.dirStatusHandler))) - r.HandleFunc("/col/delete", ms.proxyToLeader(guard.Secure(ms.collectionDeleteHandler))) - r.HandleFunc("/vol/lookup", ms.proxyToLeader(guard.Secure(ms.volumeLookupHandler))) - r.HandleFunc("/vol/grow", ms.proxyToLeader(guard.Secure(ms.volumeGrowHandler))) - r.HandleFunc("/vol/status", ms.proxyToLeader(guard.Secure(ms.volumeStatusHandler))) - r.HandleFunc("/vol/vacuum", ms.proxyToLeader(guard.Secure(ms.volumeVacuumHandler))) - r.HandleFunc("/submit", guard.Secure(ms.submitFromMasterServerHandler)) - r.HandleFunc("/delete", guard.Secure(ms.deleteFromMasterServerHandler)) + r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) + r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) + r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler))) + r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) + r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) + r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler))) + r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) + r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) + r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) + r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) r.HandleFunc("/{fileId}", ms.redirectHandler) - r.HandleFunc("/stats/counter", guard.Secure(statsCounterHandler)) - r.HandleFunc("/stats/memory", guard.Secure(statsMemoryHandler)) + r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) + r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) ms.Topo.StartRefreshWritableVolumes(garbageThreshold) diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 33e45afd2..9d9880a6a 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -58,7 +58,10 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { } ms.Topo.ProcessJoinMessage(joinMessage) - writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024}) + writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{ + VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, + SecretKey: string(ms.guard.SecretKey), + }) } func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index f3ad2974d..177514920 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -3,6 +3,7 @@ package weed_server import ( "math/rand" "net/http" + "sync" "time" "github.com/chrislusf/weed-fs/go/glog" @@ -12,6 +13,7 @@ import ( type VolumeServer struct { masterNode string + mnLock sync.RWMutex pulseSeconds int dataCenter string rack string @@ -29,40 +31,42 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string, whiteList []string, fixJpgOrientation bool) *VolumeServer { vs := &VolumeServer{ - masterNode: masterNode, pulseSeconds: pulseSeconds, dataCenter: dataCenter, rack: rack, FixJpgOrientation: fixJpgOrientation, } + vs.SetMasterNode(masterNode) vs.store = storage.NewStore(port, adminPort, ip, publicUrl, folders, maxCounts) vs.guard = security.NewGuard(whiteList, "") - adminMux.HandleFunc("/status", vs.guard.Secure(vs.statusHandler)) - adminMux.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler)) - adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler)) - adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler)) - adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler)) - adminMux.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler)) - adminMux.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler)) - adminMux.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler)) - adminMux.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler)) - adminMux.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler)) + adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) + adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler)) + adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler)) + adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) + adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) + adminMux.HandleFunc("/admin/freeze_volume", vs.guard.WhiteList(vs.freezeVolumeHandler)) + adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) + adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) + adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) + adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler)) publicMux.HandleFunc("/", vs.storeHandler) go func() { connected := true - vs.store.SetBootstrapMaster(vs.masterNode) + + vs.store.SetBootstrapMaster(vs.GetMasterNode()) vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) for { - master, err := vs.store.Join() + master, secretKey, err := vs.store.Join() if err == nil { if !connected { connected = true - vs.masterNode = master + vs.SetMasterNode(master) + vs.guard.SecretKey = secretKey glog.V(0).Infoln("Volume Server Connected with master at", master) } } else { @@ -82,8 +86,24 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string, return vs } +func (vs *VolumeServer) GetMasterNode() string { + vs.mnLock.RLock() + defer vs.mnLock.RUnlock() + return vs.masterNode +} + +func (vs *VolumeServer) SetMasterNode(masterNode string) { + vs.mnLock.Lock() + defer vs.mnLock.Unlock() + vs.masterNode = masterNode +} + func (vs *VolumeServer) Shutdown() { glog.V(0).Infoln("Shutting down volume server...") vs.store.Close() glog.V(0).Infoln("Shut down successfully!") } + +func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt { + return security.GenJwt(vs.guard.SecretKey, fileId) +} diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go index c2c9e8523..d3fdf0cb2 100644 --- a/go/weed/weed_server/volume_server_handlers.go +++ b/go/weed/weed_server/volume_server_handlers.go @@ -58,7 +58,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) glog.V(4).Infoln("volume", volumeId, "reading", n) if !vs.store.HasVolume(volumeId) { - lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String()) + lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err == nil && len(lookupResult.Locations) > 0 { http.Redirect(w, r, "http://"+lookupResult.Locations[0].Url+r.URL.Path, http.StatusMovedPermanently) @@ -253,7 +253,8 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } ret := operation.UploadResult{} - size, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r) + size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(), + vs.store, volumeId, needle, r) httpStatus := http.StatusCreated if errorStatus != "" { httpStatus = http.StatusInternalServerError @@ -290,7 +291,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } n.Size = 0 - ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r) + ret := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r) if ret != 0 { m := make(map[string]uint32)