Browse Source

Merge pull request #3 from chrislusf/master

merge
pull/60/head
yourchanges 10 years ago
parent
commit
47bc0f2f29
  1. 2
      .gitignore
  2. 3
      docs/changelist.rst
  3. 2
      go/filer/embedded_filer/directory_in_map.go
  4. 33
      go/operation/lookup.go
  5. 4
      go/storage/cdb_map.go
  6. 6
      go/storage/needle_map.go
  7. 4
      go/storage/needle_read_write.go
  8. 2
      go/storage/store.go
  9. 6
      go/storage/volume.go
  10. 3
      go/storage/volume_info.go
  11. 6
      go/storage/volume_super_block.go
  12. 2
      go/topology/configuration_test.go
  13. 2
      go/topology/topology.go
  14. 2
      go/topology/volume_growth.go
  15. 2
      go/util/constants.go
  16. 6
      go/weed/filer.go
  17. 4
      go/weed/master.go
  18. 19
      go/weed/server.go
  19. 102
      go/weed/volume.go
  20. 2
      go/weed/weed_server/common.go
  21. 2
      go/weed/weed_server/filer_server.go
  22. 2
      go/weed/weed_server/master_server_handlers_admin.go
  23. 26
      go/weed/weed_server/volume_server.go
  24. 2
      go/weed/weed_server/volume_server_handlers_helper.go
  25. 19
      note/security.txt

2
.gitignore

@ -1,3 +1,3 @@
weed
go/weed/.goxc*
tags
*.swp

3
docs/changelist.rst

@ -5,6 +5,9 @@ Introduction
############
This file contains list of recent changes, important features, usage changes, data format changes, etc. Do read this if you upgrade.
v0.68
#####
1. Filer supports storing file~file_id mapping to remote key-value storage Redis, Cassandra. So multiple filers are supported.
v0.67
#####

2
go/filer/embedded_filer/directory_in_map.go

@ -62,7 +62,7 @@ func NewDirectoryManagerInMap(dirLogFile string) (dm *DirectoryManagerInMap, err
//dm.Root do not use NewDirectoryEntryInMap, since dm.max will be changed
dm.Root = &DirectoryEntryInMap{SubDirectories: make(map[string]*DirectoryEntryInMap)}
if dm.logFile, err = os.OpenFile(dirLogFile, os.O_RDWR|os.O_CREATE, 0644); err != nil {
return nil, fmt.Errorf("cannot write directory log file %s.idx: %s", dirLogFile, err.Error())
return nil, fmt.Errorf("cannot write directory log file %s.idx: %v", dirLogFile, err)
}
return dm, dm.load()
}

33
go/operation/lookup.go

@ -27,14 +27,14 @@ func (lr *LookupResult) String() string {
}
var (
vc VidCache
vc VidCache // caching of volume locations, re-check if after 10 minutes
)
func Lookup(server string, vid string) (ret *LookupResult, err error) {
locations, cache_err := vc.Get(vid)
if cache_err != nil {
if ret, err = do_lookup(server, vid); err == nil {
vc.Set(vid, ret.Locations, 1*time.Minute)
vc.Set(vid, ret.Locations, 10*time.Minute)
}
} else {
ret = &LookupResult{VolumeId: vid, Locations: locations}
@ -75,19 +75,44 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl + "/" + fileId, nil
}
// LookupVolumeIds find volume locations by cache and actual lookup
func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) {
values := make(url.Values)
ret := make(map[string]LookupResult)
var unknown_vids []string
//check vid cache first
for _, vid := range vids {
locations, cache_err := vc.Get(vid)
if cache_err == nil {
ret[vid] = LookupResult{VolumeId: vid, Locations: locations}
} else {
unknown_vids = append(unknown_vids, vid)
}
}
//return success if all volume ids are known
if len(unknown_vids) == 0 {
return ret, nil
}
//only query unknown_vids
values := make(url.Values)
for _, vid := range unknown_vids {
values.Add("volumeId", vid)
}
jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values)
if err != nil {
return nil, err
}
ret := make(map[string]LookupResult)
err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
return nil, errors.New(err.Error() + " " + string(jsonBlob))
}
//set newly checked vids to cache
for _, vid := range unknown_vids {
locations := ret[vid].Locations
vc.Set(vid, locations, 10*time.Minute)
}
return ret, nil
}

4
go/storage/cdb_map.go

@ -214,12 +214,12 @@ func DumpNeedleMapToCdb(cdbName string, nm *NeedleMap) error {
func openTempCdb(fileName string) (cdb.AdderFunc, cdb.CloserFunc, error) {
fh, err := os.Create(fileName)
if err != nil {
return nil, nil, fmt.Errorf("cannot create cdb file %s: %s", fileName, err.Error())
return nil, nil, fmt.Errorf("cannot create cdb file %s: %v", fileName, err)
}
adder, closer, err := cdb.MakeFactory(fh)
if err != nil {
fh.Close()
return nil, nil, fmt.Errorf("error creating factory: %s", err.Error())
return nil, nil, fmt.Errorf("error creating factory: %v", err)
}
return adder, func() error {
if e := closer(); e != nil {

6
go/storage/needle_map.go

@ -126,7 +126,7 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
if _, err := nm.indexFile.Seek(0, 2); err != nil {
return 0, fmt.Errorf("cannot go to the end of indexfile %s: %s", nm.indexFile.Name(), err.Error())
return 0, fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err)
}
return nm.indexFile.Write(bytes)
}
@ -141,10 +141,10 @@ func (nm *NeedleMap) Delete(key uint64) error {
util.Uint32toBytes(bytes[8:12], 0)
util.Uint32toBytes(bytes[12:16], 0)
if _, err := nm.indexFile.Seek(0, 2); err != nil {
return fmt.Errorf("cannot go to the end of indexfile %s: %s", nm.indexFile.Name(), err.Error())
return fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err)
}
if _, err := nm.indexFile.Write(bytes); err != nil {
return fmt.Errorf("error writing to indexfile %s: %s", nm.indexFile.Name(), err.Error())
return fmt.Errorf("error writing to indexfile %s: %v", nm.indexFile.Name(), err)
}
nm.DeletionCounter++
return nil

4
go/storage/needle_read_write.go

@ -30,12 +30,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
defer func(s io.Seeker, off int64) {
if err != nil {
if _, e = s.Seek(off, 0); e != nil {
glog.V(0).Infof("Failed to seek %s back to %d with error: %s", w, off, e.Error())
glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e)
}
}
}(s, end)
} else {
err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e.Error())
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
return
}
}

2
go/storage/store.go

@ -64,7 +64,7 @@ func (mn *MasterNodes) findMaster() (string, error) {
}
}
if mn.lastNode < 0 {
return "", errors.New("No master node avalable!")
return "", errors.New("No master node available!")
}
return mn.nodes[mn.lastNode], nil
}

6
go/storage/volume.go

@ -175,7 +175,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
}
var offset int64
if offset, err = v.dataFile.Seek(0, 2); err != nil {
glog.V(0).Infof("faile to seek the end of file: %v", err)
glog.V(0).Infof("failed to seek the end of file: %v", err)
return
}
@ -287,10 +287,10 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId,
visitNeedle func(n *Needle, offset int64) error) (err error) {
var v *Volume
if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
return errors.New("Failed to load volume:" + err.Error())
return fmt.Errorf("Failed to load volume %d: %v", id, err)
}
if err = visitSuperBlock(v.SuperBlock); err != nil {
return errors.New("Failed to read super block:" + err.Error())
return fmt.Errorf("Failed to read volume %d super block: %v", id, err)
}
version := v.Version()

3
go/storage/volume_info.go

@ -39,5 +39,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er
}
func (vi VolumeInfo) String() string {
return fmt.Sprintf("Id:%s, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly)
return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v",
vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly)
}

6
go/storage/volume_super_block.go

@ -38,7 +38,7 @@ func (s *SuperBlock) Bytes() []byte {
func (v *Volume) maybeWriteSuperBlock() error {
stat, e := v.dataFile.Stat()
if e != nil {
glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile, e)
return e
}
if stat.Size() == 0 {
@ -57,11 +57,11 @@ func (v *Volume) maybeWriteSuperBlock() error {
}
func (v *Volume) readSuperBlock() (err error) {
if _, err = v.dataFile.Seek(0, 0); err != nil {
return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err)
}
header := make([]byte, SuperBlockSize)
if _, e := v.dataFile.Read(header); e != nil {
return fmt.Errorf("cannot read superblock: %s", e.Error())
return fmt.Errorf("cannot read volume %d super block: %v", v.Id, e)
}
v.SuperBlock, err = ParseSuperBlock(header)
return err

2
go/topology/configuration_test.go

@ -33,7 +33,7 @@ func TestLoadConfiguration(t *testing.T) {
fmt.Printf("%s\n", c)
if err != nil {
t.Fatalf("unmarshal error:%s", err.Error())
t.Fatalf("unmarshal error:%v", err)
}
if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {

2
go/topology/topology.go

@ -119,7 +119,7 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
return "", 0, nil, errors.New("No writable volumes available!")
}
fileId, count := t.Sequence.NextFileId(count)
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil

2
go/topology/volume_growth.go

@ -204,7 +204,7 @@ func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *Volum
glog.V(0).Infoln("Created Volume", vid, "on", server)
} else {
glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err)
return fmt.Errorf("Failed to assign %s: %s", vid.String(), err.Error())
return fmt.Errorf("Failed to assign %d: %v", vid, err)
}
}
return nil

2
go/util/constants.go

@ -1,5 +1,5 @@
package util
const (
VERSION = "0.67"
VERSION = "0.68"
)

6
go/weed/filer.go

@ -77,7 +77,7 @@ func runFiler(cmd *Command, args []string) bool {
*f.redis_server, *f.redis_database,
)
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
glog.Fatalf("Filer startup error: %v", nfs_err)
}
glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*f.port))
filerListener, e := util.NewListener(
@ -85,10 +85,10 @@ func runFiler(cmd *Command, args []string) bool {
time.Duration(10)*time.Second,
)
if e != nil {
glog.Fatalf(e.Error())
glog.Fatalf("Filer listener error: %v", e)
}
if e := http.Serve(filerListener, r); e != nil {
glog.Fatalf("Filer Fail to serve:%s", e.Error())
glog.Fatalf("Filer Fail to serve: %v", e)
}
return true

4
go/weed/master.go

@ -71,7 +71,7 @@ func runMaster(cmd *Command, args []string) bool {
listener, e := util.NewListener(listeningAddress, time.Duration(*mTimeout)*time.Second)
if e != nil {
glog.Fatalf(e.Error())
glog.Fatalf("Master startup error: %v", e)
}
go func() {
@ -93,7 +93,7 @@ func runMaster(cmd *Command, args []string) bool {
}()
if e := http.Serve(listener, r); e != nil {
glog.Fatalf("Fail to serve:%s", e.Error())
glog.Fatalf("Fail to serve: %v", e)
}
return true
}

19
go/weed/server.go

@ -81,10 +81,10 @@ func init() {
filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.cassandra_server = cmdFiler.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server")
filerOptions.cassandra_keyspace = cmdFiler.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server")
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
filerOptions.redis_database = cmdFiler.Flag.Int("filer.redis.database", 0, "the database on the redis server")
filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server")
}
@ -167,7 +167,7 @@ func runServer(cmd *Command, args []string) bool {
"", 0,
)
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
glog.Fatalf("Filer startup error: %v", nfs_err)
}
glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*filerOptions.port))
filerListener, e := util.NewListener(
@ -175,10 +175,11 @@ func runServer(cmd *Command, args []string) bool {
time.Duration(10)*time.Second,
)
if e != nil {
glog.Fatalf("Filer listener error: %v", e)
glog.Fatalf(e.Error())
}
if e := http.Serve(filerListener, r); e != nil {
glog.Fatalf("Filer Fail to serve:%s", e.Error())
glog.Fatalf("Filer Fail to serve: %v", e)
}
}()
}
@ -199,7 +200,7 @@ func runServer(cmd *Command, args []string) bool {
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort))
masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second)
if e != nil {
glog.Fatalf(e.Error())
glog.Fatalf("Master startup error: %v", e)
}
go func() {
@ -224,7 +225,7 @@ func runServer(cmd *Command, args []string) bool {
volumeWait.Wait()
time.Sleep(100 * time.Millisecond)
r := http.NewServeMux()
volumeServer := weed_server.NewVolumeServer(r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts,
volumeServer := weed_server.NewVolumeServer(r, r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation,
)
@ -235,7 +236,7 @@ func runServer(cmd *Command, args []string) bool {
time.Duration(*serverTimeout)*time.Second,
)
if e != nil {
glog.Fatalf(e.Error())
glog.Fatalf("Volume server listener error: %v", e)
}
OnInterrupt(func() {
@ -244,7 +245,7 @@ func runServer(cmd *Command, args []string) bool {
})
if e := http.Serve(volumeListener, r); e != nil {
glog.Fatalf("Fail to serve:%s", e.Error())
glog.Fatalf("Volume server fail to serve:%v", e)
}
return true

102
go/weed/volume.go

@ -13,8 +13,42 @@ import (
"github.com/chrislusf/weed-fs/go/weed/weed_server"
)
var (
v VolumeServerOptions
)
type VolumeServerOptions struct {
port *int
adminPort *int
folders []string
folderMaxLimits []int
ip *string
publicIp *string
bindIp *string
master *string
pulseSeconds *int
idleConnectionTimeout *int
maxCpu *int
dataCenter *string
rack *string
whiteList []string
fixJpgOrientation *bool
}
func init() {
cmdVolume.Run = runVolume // break init cycle
v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
v.adminPort = cmdVolume.Flag.Int("port.admin", 8443, "https admin port, active when SSL certs are specified. Not ready yet.")
v.ip = cmdVolume.Flag.String("ip", "", "ip or server name")
v.publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds")
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.")
}
var cmdVolume = &Command{
@ -26,76 +60,66 @@ var cmdVolume = &Command{
}
var (
vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
volumeSecurePort = cmdVolume.Flag.Int("port.secure", 8443, "https listen port, active when SSL certs are specified. Not ready yet.")
volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
ip = cmdVolume.Flag.String("ip", "", "ip or server name")
publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>")
volumeBindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds")
vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.")
volumeWhiteList []string
)
func runVolume(cmd *Command, args []string) bool {
if *vMaxCpu < 1 {
*vMaxCpu = runtime.NumCPU()
if *v.maxCpu < 1 {
*v.maxCpu = runtime.NumCPU()
}
runtime.GOMAXPROCS(*vMaxCpu)
folders := strings.Split(*volumeFolders, ",")
runtime.GOMAXPROCS(*v.maxCpu)
//Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(*volumeFolders, ",")
maxCountStrings := strings.Split(*maxVolumeCounts, ",")
maxCounts := make([]int, 0)
for _, maxString := range maxCountStrings {
if max, e := strconv.Atoi(maxString); e == nil {
maxCounts = append(maxCounts, max)
v.folderMaxLimits = append(v.folderMaxLimits, max)
} else {
glog.Fatalf("The max specified in -max not a valid number %s", maxString)
}
}
if len(folders) != len(maxCounts) {
glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts))
if len(v.folders) != len(v.folderMaxLimits) {
glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits))
}
for _, folder := range folders {
for _, folder := range v.folders {
if err := util.TestFolderWritable(folder); err != nil {
glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err)
}
}
if *publicIp == "" {
if *ip == "" {
*ip = "127.0.0.1"
*publicIp = "localhost"
} else {
*publicIp = *ip
//security related white list configuration
if *volumeWhiteListOption != "" {
v.whiteList = strings.Split(*volumeWhiteListOption, ",")
}
//derive default public ip address
if *v.publicIp == "" {
if *v.ip == "" {
*v.ip = "127.0.0.1"
*v.publicIp = "localhost"
} else {
*v.publicIp = *v.ip
}
if *volumeWhiteListOption != "" {
volumeWhiteList = strings.Split(*volumeWhiteListOption, ",")
}
r := http.NewServeMux()
volumeServer := weed_server.NewVolumeServer(r, *ip, *vport, *publicIp, folders, maxCounts,
*masterNode, *vpulse, *dataCenter, *rack,
volumeWhiteList,
*fixJpgOrientation,
volumeServer := weed_server.NewVolumeServer(r, r, *v.ip, *v.port, *v.publicIp, v.folders, v.folderMaxLimits,
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList,
*v.fixJpgOrientation,
)
listeningAddress := *volumeBindIp + ":" + strconv.Itoa(*vport)
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*vTimeout)*time.Second)
listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil {
glog.Fatalf(e.Error())
glog.Fatalf("Volume server listener error:%v", e)
}
OnInterrupt(func() {
@ -103,7 +127,7 @@ func runVolume(cmd *Command, args []string) bool {
})
if e := http.Serve(listener, r); e != nil {
glog.Fatalf("Fail to serve:%s", e.Error())
glog.Fatalf("Volume server fail to serve: %v", e)
}
return true
}

2
go/weed/weed_server/common.go

@ -61,7 +61,7 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter
// wrapper for writeJson - just logs errors
func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) {
if err := writeJson(w, r, httpStatus, obj); err != nil {
glog.V(0).Infof("error writing JSON %s: %s", obj, err.Error())
glog.V(0).Infof("error writing JSON %s: %v", obj, err)
}
}
func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {

2
go/weed/weed_server/filer_server.go

@ -45,7 +45,7 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
fs.filer = flat_namespace.NewFlatNamesapceFiler(master, redis_store)
} else {
if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
glog.Fatalf("Can not start filer in dir %s : %v", err)
glog.Fatalf("Can not start filer in dir %s : %v", dir, err)
return
}

2
go/weed/weed_server/master_server_handlers_admin.go

@ -121,7 +121,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
if machines != nil && len(machines) > 0 {
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
} else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found.", volumeId))
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found.", volumeId))
}
}

26
go/weed/weed_server/volume_server.go

@ -22,7 +22,7 @@ type VolumeServer struct {
FixJpgOrientation bool
}
func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int,
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int,
masterNode string, pulseSeconds int,
dataCenter string, rack string,
whiteList []string,
@ -39,18 +39,18 @@ func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, fol
vs.guard = security.NewGuard(whiteList, "")
r.HandleFunc("/status", vs.guard.Secure(vs.statusHandler))
r.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler))
r.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler))
r.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler))
r.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler))
r.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler))
r.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler))
r.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler))
r.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler))
r.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler))
r.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler))
r.HandleFunc("/", vs.storeHandler)
adminMux.HandleFunc("/status", vs.guard.Secure(vs.statusHandler))
adminMux.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler))
adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler))
adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler))
adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler))
adminMux.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler))
publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler))
publicMux.HandleFunc("/", vs.storeHandler)
go func() {
connected := true

2
go/weed/weed_server/volume_server_handlers_helper.go

@ -93,7 +93,7 @@ func (w *countingWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}
// rangesMIMESize returns the nunber of bytes it takes to encode the
// rangesMIMESize returns the number of bytes it takes to encode the
// provided ranges as a multipart response.
func rangesMIMESize(ranges []httpRange, contentType string, contentSize int64) (encSize int64) {
var w countingWriter

19
note/security.txt

@ -3,7 +3,7 @@ Design for Seaweed-FS security
Design Objectives
Security can mean many different things. The original vision is that: if you have one machine lying around
somewhere with some disk space, it should be able to join your file system to contribute some disk space and
network bandwidth.
network bandwidth, securely!
To achieve this purpose, the security should be able to:
1. Secure the inter-server communication. Only real cluster servers can join and communicate.
@ -14,7 +14,7 @@ Non Objective
User specific access control.
Design Architect
master, and volume servers all talk securely via 2-way SSL for admin.
master, and volume servers all talk securely via 2-way SSL for admin operations.
upon joining, master gives its secret key to volume servers.
filer or clients talk to master to get secret key, and use the key to generate JWT to write on volume server.
A side benefit:
@ -34,3 +34,18 @@ file uploading:
when filer/clients wants to upload, master generate a JWT
filer~>volume(public port)
master~>volume(public port)
Currently, volume server has 2 ip addresses: ip and publicUrl.
The ip is for admin purpose, and master talk to volume server this way.
The publicUrl is for clients to access the server, via http GET/POST/DELETE etc.
The write operations are secured by JWT.
clients talk to master also via https? possible. Decide on this later.
Dev plan:
1. volume server separate admin from public GET/POST/DELETE handlers
The step 1 may be good enough for most use cases.
If 2-way ssl are still needed
2. volume server add ssl support
3. https connections to operate on volume servers
Loading…
Cancel
Save