Browse Source

Merge pull request #5 from chrislusf/master

merge
pull/84/head
yourchanges 10 years ago
parent
commit
55c40997b1
  1. 2
      .travis.yml
  2. 61
      docs/usecases.rst
  3. 8
      go/operation/delete_content.go
  4. 2
      go/operation/lookup.go
  5. 2
      go/operation/submit.go
  6. 4
      go/weed/benchmark.go
  7. 13
      go/weed/master.go
  8. 24
      go/weed/server.go
  9. 16
      go/weed/volume.go
  10. 2
      go/weed/weed_server/common.go
  11. 4
      go/weed/weed_server/filer_server_handlers.go
  12. 2
      go/weed/weed_server/master_server_handlers_admin.go
  13. 4
      go/weed/weed_server/volume_server.go
  14. 2
      go/weed/weed_server/volume_server_handlers.go

2
.travis.yml

@ -1,7 +1,7 @@
language: go language: go
go: go:
- 1.3 - 1.3
- release
- 1.4.1
- tip - tip
before_install: before_install:

61
docs/usecases.rst

@ -52,4 +52,63 @@ However, if blocking servicing port is not feasible or trivial, a white list opt
weed master -whiteList="::1,127.0.0.1" weed master -whiteList="::1,127.0.0.1"
weed volume -whiteList="::1,127.0.0.1" weed volume -whiteList="::1,127.0.0.1"
# "::1" is for IP v6 localhost.
# "::1" is for IP v6 localhost.
Data Migration Example
#############################
.. code-block:: bash
weed master -mdir="/tmp/mdata" -defaultReplication="001" -ip="localhost" -port=9334
weed volume -dir=/tmp/vol1/ -mserver="localhost:9334" -ip="localhost" -port=8081
weed volume -dir=/tmp/vol2/ -mserver="localhost:9334" -ip="localhost" -port=8082
weed volume -dir=/tmp/vol3/ -mserver="localhost:9334" -ip="localhost" -port=8083
.. code-block:: bash
ls vol1 vol2 vol3
vol1:
1.dat 1.idx 2.dat 2.idx 3.dat 3.idx 5.dat 5.idx
vol2:
2.dat 2.idx 3.dat 3.idx 4.dat 4.idx 6.dat 6.idx
vol3:
1.dat 1.idx 4.dat 4.idx 5.dat 5.idx 6.dat 6.idx
stop all of them
move vol3/* to vol1 and vol2
it is ok to move x.dat and x.idx from one volumeserver to another volumeserver,
because they are exactly the same.
it can be checked by md5.
.. code-block:: bash
md5 vol1/1.dat vol2/1.dat
MD5 (vol1/1.dat) = c1a49a0ee550b44fef9f8ae9e55215c7
MD5 (vol2/1.dat) = c1a49a0ee550b44fef9f8ae9e55215c7
md5 vol1/1.idx vol2/1.idx
MD5 (vol1/1.idx) = b9edc95795dfb3b0f9063c9cc9ba8095
MD5 (vol2/1.idx) = b9edc95795dfb3b0f9063c9cc9ba8095
.. code-block:: bash
ls vol1 vol2 vol3
vol1:
1.dat 1.idx 2.dat 2.idx 3.dat 3.idx 4.dat 4.idx 5.dat 5.idx 6.dat 6.idx
vol2:
1.dat 1.idx 2.dat 2.idx 3.dat 3.idx 4.dat 4.idx 5.dat 5.idx 6.dat 6.idx
vol3:
start
.. code-block:: bash
weed master -mdir="/tmp/mdata" -defaultReplication="001" -ip="localhost" -port=9334
weed volume -dir=/tmp/vol1/ -mserver="localhost:9334" -ip="localhost" -port=8081
weed volume -dir=/tmp/vol2/ -mserver="localhost:9334" -ip="localhost" -port=8082
so we finished moving data of localhost:8083 to localhost:8081/localhost:8082

8
go/operation/delete_content.go

@ -66,11 +66,11 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) {
continue continue
} }
for _, location := range result.Locations { for _, location := range result.Locations {
if _, ok := server_to_fileIds[location.PublicUrl]; !ok {
server_to_fileIds[location.PublicUrl] = make([]string, 0)
if _, ok := server_to_fileIds[location.Url]; !ok {
server_to_fileIds[location.Url] = make([]string, 0)
} }
server_to_fileIds[location.PublicUrl] = append(
server_to_fileIds[location.PublicUrl], vid_to_fileIds[vid]...)
server_to_fileIds[location.Url] = append(
server_to_fileIds[location.Url], vid_to_fileIds[vid]...)
} }
} }

2
go/operation/lookup.go

@ -72,7 +72,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
if len(lookup.Locations) == 0 { if len(lookup.Locations) == 0 {
return "", errors.New("File Not Found") return "", errors.New("File Not Found")
} }
return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl + "/" + fileId, nil
return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil
} }
// LookupVolumeIds find volume locations by cache and actual lookup // LookupVolumeIds find volume locations by cache and actual lookup

2
go/operation/submit.go

@ -137,7 +137,7 @@ func upload_one_chunk(filename string, reader io.Reader, master, replication str
if err != nil { if err != nil {
return "", 0, err return "", 0, err
} }
fileUrl, fid := "http://"+ret.PublicUrl+"/"+ret.Fid, ret.Fid
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream") uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream")
if uploadError != nil { if uploadError != nil {

4
go/weed/benchmark.go

@ -203,7 +203,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
fileSize := int64(*b.fileSize + rand.Intn(64)) fileSize := int64(*b.fileSize + rand.Intn(64))
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
if _, err := fp.Upload(0, *b.server); err == nil { if _, err := fp.Upload(0, *b.server); err == nil {
if rand.Intn(100) < *b.deletePercentage { if rand.Intn(100) < *b.deletePercentage {
s.total++ s.total++
@ -251,7 +251,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
if _, now_ok := b.vid2server[vid]; !now_ok { if _, now_ok := b.vid2server[vid]; !now_ok {
if ret, err := operation.Lookup(*b.server, vid); err == nil { if ret, err := operation.Lookup(*b.server, vid); err == nil {
if len(ret.Locations) > 0 { if len(ret.Locations) > 0 {
server = ret.Locations[0].PublicUrl
server = ret.Locations[0].Url
b.vid2server[vid] = server b.vid2server[vid] = server
} }
} }

13
go/weed/master.go

@ -31,7 +31,6 @@ var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port") mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
masterIp = cmdMaster.Flag.String("ip", "", "master listening ip address, default to listen on all network interfaces") masterIp = cmdMaster.Flag.String("ip", "", "master listening ip address, default to listen on all network interfaces")
masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
mPublicIp = cmdMaster.Flag.String("publicIp", "", "peer accessible <ip>|<server_name>")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list") masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
@ -76,19 +75,15 @@ func runMaster(cmd *Command, args []string) bool {
go func() { go func() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if *mPublicIp == "" {
if *masterIp == "" {
*mPublicIp = "localhost"
} else {
*mPublicIp = *masterIp
}
if *masterIp == "" {
*masterIp = "localhost"
} }
myPublicMasterAddress := *mPublicIp + ":" + strconv.Itoa(*mport)
myMasterAddress := *masterIp + ":" + strconv.Itoa(*mport)
var peers []string var peers []string
if *masterPeers != "" { if *masterPeers != "" {
peers = strings.Split(*masterPeers, ",") peers = strings.Split(*masterPeers, ",")
} }
raftServer := weed_server.NewRaftServer(r, peers, myPublicMasterAddress, *metaFolder, ms.Topo, *mpulse)
raftServer := weed_server.NewRaftServer(r, peers, myMasterAddress, *metaFolder, ms.Topo, *mpulse)
ms.SetRaftServer(raftServer) ms.SetRaftServer(raftServer)
}() }()

24
go/weed/server.go

@ -48,7 +48,7 @@ var cmdServer = &Command{
var ( var (
serverIp = cmdServer.Flag.String("ip", "", "ip or server name") serverIp = cmdServer.Flag.String("ip", "", "ip or server name")
serverPublicIp = cmdServer.Flag.String("publicIp", "", "ip or server name")
serverPublicUrl = cmdServer.Flag.String("publicUrl", "", "publicly accessible address")
serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds") serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds")
@ -99,19 +99,15 @@ func runServer(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
if *serverPublicIp == "" {
if *serverIp == "" {
*serverPublicIp = "localhost"
} else {
*serverPublicIp = *serverIp
}
if *serverIp == "" {
*serverIp = "localhost"
} }
if *filerOptions.redirectOnRead { if *filerOptions.redirectOnRead {
*isStartingFiler = true *isStartingFiler = true
} }
*filerOptions.master = *serverPublicIp + ":" + strconv.Itoa(*masterPort)
*filerOptions.master = *serverIp + ":" + strconv.Itoa(*masterPort)
if *filerOptions.defaultReplicaPlacement == "" { if *filerOptions.defaultReplicaPlacement == "" {
*filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement *filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement
@ -144,9 +140,11 @@ func runServer(cmd *Command, args []string) bool {
if *masterMetaFolder == "" { if *masterMetaFolder == "" {
*masterMetaFolder = folders[0] *masterMetaFolder = folders[0]
} }
if *filerOptions.dir == "" {
*filerOptions.dir = *masterMetaFolder + "/filer"
os.MkdirAll(*filerOptions.dir, 0700)
if *isStartingFiler {
if *filerOptions.dir == "" {
*filerOptions.dir = *masterMetaFolder + "/filer"
os.MkdirAll(*filerOptions.dir, 0700)
}
} }
if err := util.TestFolderWritable(*masterMetaFolder); err != nil { if err := util.TestFolderWritable(*masterMetaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err)
@ -207,7 +205,7 @@ func runServer(cmd *Command, args []string) bool {
go func() { go func() {
raftWaitForMaster.Wait() raftWaitForMaster.Wait()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
myAddress := *serverPublicIp + ":" + strconv.Itoa(*masterPort)
myAddress := *serverIp + ":" + strconv.Itoa(*masterPort)
var peers []string var peers []string
if *serverPeers != "" { if *serverPeers != "" {
peers = strings.Split(*serverPeers, ",") peers = strings.Split(*serverPeers, ",")
@ -227,7 +225,7 @@ func runServer(cmd *Command, args []string) bool {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
r := http.NewServeMux() r := http.NewServeMux()
volumeServer := weed_server.NewVolumeServer(r, r, volumeServer := weed_server.NewVolumeServer(r, r,
*serverIp, *volumePort, *volumeAdminPort, *serverPublicIp,
*serverIp, *volumePort, *volumeAdminPort, *serverPublicUrl,
folders, maxCounts, folders, maxCounts,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation, serverWhiteList, *volumeFixJpgOrientation,

16
go/weed/volume.go

@ -23,7 +23,7 @@ type VolumeServerOptions struct {
folders []string folders []string
folderMaxLimits []int folderMaxLimits []int
ip *string ip *string
publicIp *string
publicUrl *string
bindIp *string bindIp *string
master *string master *string
pulseSeconds *int pulseSeconds *int
@ -40,7 +40,7 @@ func init() {
v.port = cmdVolume.Flag.Int("port", 8080, "http listen port") v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
v.adminPort = cmdVolume.Flag.Int("port.admin", 0, "admin port to talk with master and other volume servers") v.adminPort = cmdVolume.Flag.Int("port.admin", 0, "admin port to talk with master and other volume servers")
v.ip = cmdVolume.Flag.String("ip", "", "ip or server name") v.ip = cmdVolume.Flag.String("ip", "", "ip or server name")
v.publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
@ -95,14 +95,8 @@ func runVolume(cmd *Command, args []string) bool {
v.whiteList = strings.Split(*volumeWhiteListOption, ",") v.whiteList = strings.Split(*volumeWhiteListOption, ",")
} }
//derive default public ip address
if *v.publicIp == "" {
if *v.ip == "" {
*v.ip = "127.0.0.1"
*v.publicIp = "localhost"
} else {
*v.publicIp = *v.ip
}
if *v.ip == "" {
*v.ip = "127.0.0.1"
} }
if *v.adminPort == 0 { if *v.adminPort == 0 {
@ -117,7 +111,7 @@ func runVolume(cmd *Command, args []string) bool {
} }
volumeServer := weed_server.NewVolumeServer(publicMux, adminMux, volumeServer := weed_server.NewVolumeServer(publicMux, adminMux,
*v.ip, *v.port, *v.adminPort, *v.publicIp,
*v.ip, *v.port, *v.adminPort, *v.publicUrl,
v.folders, v.folderMaxLimits, v.folders, v.folderMaxLimits,
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList, v.whiteList,

2
go/weed/weed_server/common.go

@ -96,7 +96,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
return return
} }
url := "http://" + assignResult.PublicUrl + "/" + assignResult.Fid
url := "http://" + assignResult.Url + "/" + assignResult.Fid
if lastModified != 0 { if lastModified != 0 {
url = url + "?ts=" + strconv.FormatUint(lastModified, 10) url = url + "?ts=" + strconv.FormatUint(lastModified, 10)
} }

4
go/weed/weed_server/filer_server_handlers.go

@ -80,7 +80,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl
urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].Url
urlString := "http://" + urlLocation + "/" + fileId urlString := "http://" + urlLocation + "/" + fileId
if fs.redirectOnRead { if fs.redirectOnRead {
http.Redirect(w, r, urlString, http.StatusFound) http.Redirect(w, r, urlString, http.StatusFound)
@ -126,7 +126,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
u, _ := url.Parse("http://" + assignResult.PublicUrl + "/" + assignResult.Fid)
u, _ := url.Parse("http://" + assignResult.Url + "/" + assignResult.Fid)
glog.V(4).Infoln("post to", u) glog.V(4).Infoln("post to", u)
request := &http.Request{ request := &http.Request{
Method: r.Method, Method: r.Method,

2
go/weed/weed_server/master_server_handlers_admin.go

@ -119,7 +119,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
} }
machines := ms.Topo.Lookup("", volumeId) machines := ms.Topo.Lookup("", volumeId)
if machines != nil && len(machines) > 0 { if machines != nil && len(machines) > 0 {
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
http.Redirect(w, r, "http://"+machines[0].Url()+r.URL.Path, http.StatusMovedPermanently)
} else { } else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found.", volumeId)) writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found.", volumeId))
} }

4
go/weed/weed_server/volume_server.go

@ -3,7 +3,6 @@ package weed_server
import ( import (
"math/rand" "math/rand"
"net/http" "net/http"
"strconv"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
@ -23,13 +22,12 @@ type VolumeServer struct {
} }
func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string, func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
port, adminPort int, publicIp string,
port, adminPort int, publicUrl string,
folders []string, maxCounts []int, folders []string, maxCounts []int,
masterNode string, pulseSeconds int, masterNode string, pulseSeconds int,
dataCenter string, rack string, dataCenter string, rack string,
whiteList []string, whiteList []string,
fixJpgOrientation bool) *VolumeServer { fixJpgOrientation bool) *VolumeServer {
publicUrl := publicIp + ":" + strconv.Itoa(port)
vs := &VolumeServer{ vs := &VolumeServer{
masterNode: masterNode, masterNode: masterNode,
pulseSeconds: pulseSeconds, pulseSeconds: pulseSeconds,

2
go/weed/weed_server/volume_server_handlers.go

@ -61,7 +61,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String()) lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 { if err == nil && len(lookupResult.Locations) > 0 {
http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
http.Redirect(w, r, "http://"+lookupResult.Locations[0].Url+r.URL.Path, http.StatusMovedPermanently)
} else { } else {
glog.V(2).Infoln("lookup error:", err, r.URL.Path) glog.V(2).Infoln("lookup error:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)

Loading…
Cancel
Save