From db8e27be6ec7daa1a188f90f61e385c04cb6b008 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 26 Feb 2013 22:54:22 -0800 Subject: [PATCH] add lots of error checking by GThomas --- go/directory/file_id.go | 2 +- go/operation/allocate_volume.go | 6 +- go/operation/lookup_volume_id.go | 4 +- go/operation/upload_content.go | 14 ++- go/replication/volume_growth.go | 6 +- go/replication/volume_growth_test.go | 9 +- go/sequence/sequence.go | 15 ++- go/storage/compact_map_perf_test.go | 2 +- go/storage/compress.go | 2 +- go/storage/needle_map.go | 9 +- go/storage/needle_read_write.go | 2 +- go/storage/store.go | 6 +- go/storage/volume.go | 27 ++++-- go/topology/data_node.go | 2 +- go/topology/node.go | 2 +- go/topology/node_list.go | 2 +- go/topology/node_list_test.go | 14 ++- go/topology/topo_test.go | 124 +++++++++++++------------ go/topology/topology.go | 12 +-- go/topology/topology_compact.go | 4 +- go/topology/topology_event_handling.go | 2 +- go/topology/volume_layout.go | 2 +- go/weed/export.go | 4 +- go/weed/fix.go | 4 +- go/weed/master.go | 52 ++++++----- go/weed/shell.go | 14 ++- go/weed/upload.go | 4 +- go/weed/volume.go | 59 +++++++----- go/weed/weed.go | 33 +++++-- 29 files changed, 268 insertions(+), 170 deletions(-) diff --git a/go/directory/file_id.go b/go/directory/file_id.go index f5f6d46d6..807a10f7a 100644 --- a/go/directory/file_id.go +++ b/go/directory/file_id.go @@ -1,9 +1,9 @@ package directory import ( - "encoding/hex" "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/util" + "encoding/hex" "strings" ) diff --git a/go/operation/allocate_volume.go b/go/operation/allocate_volume.go index 19166eaed..ea34901ef 100644 --- a/go/operation/allocate_volume.go +++ b/go/operation/allocate_volume.go @@ -1,12 +1,12 @@ package operation import ( - "encoding/json" - "errors" - "net/url" "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/util" + "encoding/json" + "errors" + "net/url" ) type AllocateVolumeResult struct { diff --git a/go/operation/lookup_volume_id.go b/go/operation/lookup_volume_id.go index 8512ac918..0d8f247be 100644 --- a/go/operation/lookup_volume_id.go +++ b/go/operation/lookup_volume_id.go @@ -1,12 +1,12 @@ package operation import ( + "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" _ "fmt" "net/url" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/util" ) type Location struct { diff --git a/go/operation/upload_content.go b/go/operation/upload_content.go index 0bdb697da..cae657b2c 100644 --- a/go/operation/upload_content.go +++ b/go/operation/upload_content.go @@ -21,9 +21,19 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, body_buf := bytes.NewBufferString("") body_writer := multipart.NewWriter(body_buf) file_writer, err := body_writer.CreateFormFile("file", filename) - io.Copy(file_writer, reader) + if err != nil { + log.Println("error creating form file", err) + return nil, err + } + if _, err = io.Copy(file_writer, reader); err != nil { + log.Println("error copying data", err) + return nil, err + } content_type := body_writer.FormDataContentType() - body_writer.Close() + if err = body_writer.Close(); err != nil { + log.Println("error closing body", err) + return nil, err + } resp, err := http.Post(uploadUrl, content_type, body_buf) if err != nil { log.Println("failing to upload to", uploadUrl) diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go index 747e07642..1f266f73f 100644 --- a/go/replication/volume_growth.go +++ b/go/replication/volume_growth.go @@ -1,12 +1,12 @@ package replication import ( - "errors" - "fmt" - "math/rand" "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/topology" + "errors" + "fmt" + "math/rand" "sync" ) diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go index e35dbc707..3fbeebc9e 100644 --- a/go/replication/volume_growth_test.go +++ b/go/replication/volume_growth_test.go @@ -1,11 +1,11 @@ package replication import ( + "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/topology" "encoding/json" "fmt" "math/rand" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" "testing" "time" ) @@ -96,7 +96,10 @@ func setup(topologyLayout string) *topology.Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} + vi := storage.VolumeInfo{ + Id: storage.VolumeId(int64(m["id"].(float64))), + Size: uint64(m["size"].(float64)), + Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) diff --git a/go/sequence/sequence.go b/go/sequence/sequence.go index c85289468..0237f1f80 100644 --- a/go/sequence/sequence.go +++ b/go/sequence/sequence.go @@ -36,10 +36,15 @@ func NewSequencer(dirname string, filename string) (m *SequencerImpl) { } else { decoder := gob.NewDecoder(seqFile) defer seqFile.Close() - decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) + if se = decoder.Decode(&m.FileIdSequence); se != nil { + log.Printf("error decoding FileIdSequence: %s", se) + m.FileIdSequence = FileIdSaveInterval + log.Println("Setting file id sequence", m.FileIdSequence) + } else { + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) + m.FileIdSequence += FileIdSaveInterval + } //in case the server stops between intervals - m.FileIdSequence += FileIdSaveInterval } return } @@ -67,5 +72,7 @@ func (m *SequencerImpl) saveSequence() { } defer seqFile.Close() encoder := gob.NewEncoder(seqFile) - encoder.Encode(m.FileIdSequence) + if e = encoder.Encode(m.FileIdSequence); e != nil { + log.Fatalf("Sequence File Save [ERROR] %s\n", e) + } } diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go index cfc53ef65..b885ebd93 100644 --- a/go/storage/compact_map_perf_test.go +++ b/go/storage/compact_map_perf_test.go @@ -1,9 +1,9 @@ package storage import ( + "code.google.com/p/weed-fs/go/util" "log" "os" - "code.google.com/p/weed-fs/go/util" "testing" ) diff --git a/go/storage/compress.go b/go/storage/compress.go index 256789c9c..e8816422b 100644 --- a/go/storage/compress.go +++ b/go/storage/compress.go @@ -10,7 +10,7 @@ import ( /* * Default more not to gzip since gzip can be done on client side. -*/ + */ func IsGzippable(ext, mtype string) bool { if strings.HasPrefix(mtype, "text/") { return true diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 4465fab22..b2e232009 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -94,14 +94,17 @@ func (nm *NeedleMap) Delete(key uint64) error { util.Uint32toBytes(nm.bytes[8:12], 0) util.Uint32toBytes(nm.bytes[12:16], 0) if _, err = nm.indexFile.Write(nm.bytes); err != nil { - nm.indexFile.Truncate(offset) - return fmt.Errorf("error writing to indexfile %s: %s", nm.indexFile, err) + plus := "" + if e := nm.indexFile.Truncate(offset); e != nil { + plus = "\ncouldn't truncate index file: " + e.Error() + } + return fmt.Errorf("error writing to indexfile %s: %s%s", nm.indexFile, err, plus) } nm.deletionCounter++ return nil } func (nm *NeedleMap) Close() { - nm.indexFile.Close() + _ = nm.indexFile.Close() } func (nm *NeedleMap) ContentSize() uint64 { return nm.fileByteCounter diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 881db50a6..c5f27ea21 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -1,11 +1,11 @@ package storage import ( + "code.google.com/p/weed-fs/go/util" "errors" "fmt" "io" "os" - "code.google.com/p/weed-fs/go/util" ) const ( diff --git a/go/storage/store.go b/go/storage/store.go index 857a62506..142ee84e2 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -1,12 +1,12 @@ package storage import ( + "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" "io/ioutil" "log" "net/url" - "code.google.com/p/weed-fs/go/util" "strconv" "strings" ) @@ -175,7 +175,9 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { size, err = v.write(n) if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() { log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit) - s.Join() + if err = s.Join(); err != nil { + log.Printf("error with Join: %s", err) + } } return } diff --git a/go/storage/volume.go b/go/storage/volume.go index aeb1126d7..b9c7484d7 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -86,7 +86,7 @@ func (v *Volume) Close() { v.accessLock.Lock() defer v.accessLock.Unlock() v.nm.Close() - v.dataFile.Close() + _ = v.dataFile.Close() } func (v *Volume) maybeWriteSuperBlock() error { stat, e := v.dataFile.Stat() @@ -101,7 +101,9 @@ func (v *Volume) maybeWriteSuperBlock() error { return e } func (v *Volume) readSuperBlock() (err error) { - v.dataFile.Seek(0, 0) + if _, err = v.dataFile.Seek(0, 0); err != nil { + return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile, err) + } header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { return fmt.Errorf("cannot read superblock: %s", e) @@ -128,7 +130,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { return } if size, err = n.Append(v.dataFile, v.Version()); err != nil { - v.dataFile.Truncate(offset) + if e := v.dataFile.Truncate(offset); e != nil { + err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e) + } return } nv, ok := v.nm.Get(n.Id) @@ -143,9 +147,14 @@ func (v *Volume) delete(n *Needle) (uint32, error) { nv, ok := v.nm.Get(n.Id) //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) if ok { - v.nm.Delete(n.Id) - v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0) - _, err := n.Append(v.dataFile, v.Version()) + var err error + if err = v.nm.Delete(n.Id); err != nil { + return nv.Size, err + } + if _, err = v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0); err != nil { + return nv.Size, err + } + _, err = n.Append(v.dataFile, v.Version()) return nv.Size, err } return 0, nil @@ -156,7 +165,9 @@ func (v *Volume) read(n *Needle) (int, error) { defer v.accessLock.Unlock() nv, ok := v.nm.Get(n.Id) if ok && nv.Offset > 0 { - v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0) + if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil { + return -1, err + } return n.Read(v.dataFile, nv.Size, v.Version()) } return -1, errors.New("Not Found") @@ -176,7 +187,7 @@ func (v *Volume) compact() error { func (v *Volume) commitCompact() error { v.accessLock.Lock() defer v.accessLock.Unlock() - v.dataFile.Close() + _ = v.dataFile.Close() var e error if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil { return e diff --git a/go/topology/data_node.go b/go/topology/data_node.go index ba37f0d5f..ea4ea5d39 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -1,8 +1,8 @@ package topology import ( - _ "fmt" "code.google.com/p/weed-fs/go/storage" + _ "fmt" "strconv" ) diff --git a/go/topology/node.go b/go/topology/node.go index 90826dfae..786f76702 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -1,8 +1,8 @@ package topology import ( - "fmt" "code.google.com/p/weed-fs/go/storage" + "fmt" ) type NodeId string diff --git a/go/topology/node_list.go b/go/topology/node_list.go index 293f534ea..db7723714 100644 --- a/go/topology/node_list.go +++ b/go/topology/node_list.go @@ -1,9 +1,9 @@ package topology import ( + "code.google.com/p/weed-fs/go/storage" "fmt" "math/rand" - "code.google.com/p/weed-fs/go/storage" ) type NodeList struct { diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go index 4cd2ebaa1..c6e530724 100644 --- a/go/topology/node_list_test.go +++ b/go/topology/node_list_test.go @@ -7,7 +7,11 @@ import ( ) func TestXYZ(t *testing.T) { - topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) + topo, err := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) + if err != nil { + t.Error("cannot create new topology:", err) + t.FailNow() + } for i := 0; i < 5; i++ { dc := NewDataCenter("dc" + strconv.Itoa(i)) dc.activeVolumeCount = i @@ -16,22 +20,22 @@ func TestXYZ(t *testing.T) { } nl := NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(1) + picked, ret := nl.RandomlyPickN(1, 0) if !ret || len(picked) != 1 { t.Error("need to randomly pick 1 node") } - picked, ret = nl.RandomlyPickN(4) + picked, ret = nl.RandomlyPickN(4, 0) if !ret || len(picked) != 4 { t.Error("need to randomly pick 4 nodes") } - picked, ret = nl.RandomlyPickN(5) + picked, ret = nl.RandomlyPickN(5, 0) if !ret || len(picked) != 5 { t.Error("need to randomly pick 5 nodes") } - picked, ret = nl.RandomlyPickN(6) + picked, ret = nl.RandomlyPickN(6, 0) if ret || len(picked) != 0 { t.Error("can not randomly pick 6 nodes:", ret, picked) } diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go index f8af79b21..99e570821 100644 --- a/go/topology/topo_test.go +++ b/go/topology/topo_test.go @@ -1,72 +1,72 @@ package topology import ( + "code.google.com/p/weed-fs/go/storage" "encoding/json" "fmt" "math/rand" - "code.google.com/p/weed-fs/go/storage" "testing" "time" ) var topologyLayout = ` { - "dc1":{ - "rack1":{ - "server1":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":2, "size":12312}, - {"id":3, "size":12312} - ], - "limit":3 - }, - "server2":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":10 - } - }, - "rack2":{ - "server1":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":4 - }, - "server2":{ - "volumes":[], - "limit":4 - }, - "server3":{ - "volumes":[ - {"id":2, "size":12312}, - {"id":3, "size":12312}, - {"id":4, "size":12312} - ], - "limit":2 - } - } - }, - "dc2":{ - }, - "dc3":{ - "rack2":{ - "server1":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":3, "size":12312}, - {"id":5, "size":12312} - ], - "limit":4 - } - } - } + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":2, "size":12312}, + {"id":3, "size":12312} + ], + "limit":3 + }, + "server2":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":10 + } + }, + "rack2":{ + "server1":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":4 + }, + "server2":{ + "volumes":[], + "limit":4 + }, + "server3":{ + "volumes":[ + {"id":2, "size":12312}, + {"id":3, "size":12312}, + {"id":4, "size":12312} + ], + "limit":2 + } + } + }, + "dc2":{ + }, + "dc3":{ + "rack2":{ + "server1":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":3, "size":12312}, + {"id":5, "size":12312} + ], + "limit":4 + } + } + } } ` @@ -78,7 +78,10 @@ func setup(topologyLayout string) *Topology { } //need to connect all nodes first before server adding volumes - topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) + topo, err := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) + if err != nil { + fmt.Println("error:", err) + } mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := NewDataCenter(dcKey) @@ -94,7 +97,10 @@ func setup(topologyLayout string) *Topology { rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} + vi := storage.VolumeInfo{ + Id: storage.VolumeId(int64(m["id"].(float64))), + Size: uint64(m["size"].(float64)), + Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) diff --git a/go/topology/topology.go b/go/topology/topology.go index 70a1ad268..74dc1cd09 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -1,12 +1,12 @@ package topology import ( - "errors" - "io/ioutil" - "math/rand" "code.google.com/p/weed-fs/go/directory" "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" + "errors" + "io/ioutil" + "math/rand" ) type Topology struct { @@ -28,7 +28,7 @@ type Topology struct { configuration *Configuration } -func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology { +func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) (*Topology, error) { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" @@ -44,9 +44,9 @@ func NewTopology(id string, confFile string, dirname string, sequenceFilename st t.chanRecoveredDataNodes = make(chan *DataNode) t.chanFullVolumes = make(chan storage.VolumeInfo) - t.loadConfiguration(confFile) + err := t.loadConfiguration(confFile) - return t + return t, err } func (t *Topology) loadConfiguration(configurationFile string) error { diff --git a/go/topology/topology_compact.go b/go/topology/topology_compact.go index 9c9abde4f..7215edc4e 100644 --- a/go/topology/topology_compact.go +++ b/go/topology/topology_compact.go @@ -1,12 +1,12 @@ package topology import ( + "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" "fmt" "net/url" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/util" "time" ) diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 9093bf884..fd2fe3bef 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -1,9 +1,9 @@ package topology import ( + "code.google.com/p/weed-fs/go/storage" "fmt" "math/rand" - "code.google.com/p/weed-fs/go/storage" "time" ) diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index b6e6e8bfe..f5c2e2360 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -1,10 +1,10 @@ package topology import ( + "code.google.com/p/weed-fs/go/storage" "errors" "fmt" "math/rand" - "code.google.com/p/weed-fs/go/storage" ) type VolumeLayout struct { diff --git a/go/weed/export.go b/go/weed/export.go index 6b391024a..9e65a4de3 100644 --- a/go/weed/export.go +++ b/go/weed/export.go @@ -3,12 +3,12 @@ package main import ( "archive/tar" "bytes" + "code.google.com/p/weed-fs/go/directory" + "code.google.com/p/weed-fs/go/storage" "fmt" "log" "os" "path" - "code.google.com/p/weed-fs/go/directory" - "code.google.com/p/weed-fs/go/storage" "strconv" "strings" "text/template" diff --git a/go/weed/fix.go b/go/weed/fix.go index 249007252..597bc0ef9 100644 --- a/go/weed/fix.go +++ b/go/weed/fix.go @@ -1,10 +1,10 @@ package main import ( + "code.google.com/p/weed-fs/go/storage" "log" "os" "path" - "code.google.com/p/weed-fs/go/storage" "strconv" ) @@ -52,7 +52,7 @@ func runFix(cmd *Command, args []string) bool { debug("saved", count, "with error", pe) } else { debug("skipping deleted file ...") - nm.Delete(n.Id) + return nm.Delete(n.Id) } return nil }) diff --git a/go/weed/master.go b/go/weed/master.go index 3d8757c16..f6cc88df0 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -1,13 +1,13 @@ package main import ( + "code.google.com/p/weed-fs/go/replication" + "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/topology" "encoding/json" "errors" "log" "net/http" - "code.google.com/p/weed-fs/go/replication" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" "runtime" "strconv" "strings" @@ -57,14 +57,14 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) { for _, dn := range machines { ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) } - writeJson(w, r, map[string]interface{}{"locations": ret}) + writeJsonQuiet(w, r, map[string]interface{}{"locations": ret}) } else { w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) } } else { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid}) + writeJsonQuiet(w, r, map[string]string{"error": "unknown volumeId format " + vid}) } } @@ -80,24 +80,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { rt, err := storage.NewReplicationTypeFromString(repType) if err != nil { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) return } if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { if topo.FreeSpace() <= 0 { w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "No free volumes left!"}) + writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) return } else { - vg.GrowByType(rt, topo) + if _, err = vg.GrowByType(rt, topo); err != nil { + writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) + return + } } } fid, count, dn, err := topo.PickForWrite(rt, c) if err == nil { - writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) + writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) } else { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } } @@ -112,19 +115,22 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") publicUrl := r.FormValue("publicUrl") volumes := new([]storage.VolumeInfo) - json.Unmarshal([]byte(r.FormValue("volumes")), volumes) + if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil { + writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()}) + return + } debug(s, "volumes", r.FormValue("volumes")) topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) m := make(map[string]interface{}) m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func dirStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = VERSION m["Topology"] = topo.ToMap() - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { @@ -153,10 +159,10 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { } if err != nil { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": "parameter replication " + err.Error()}) } else { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]interface{}{"count": count}) + writeJsonQuiet(w, r, map[string]interface{}{"count": count}) } } @@ -164,7 +170,7 @@ func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = VERSION m["Volumes"] = topo.ToVolumeMap() - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func redirectHandler(w http.ResponseWriter, r *http.Request) { @@ -179,7 +185,7 @@ func redirectHandler(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) } else { w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) } } @@ -188,7 +194,11 @@ func runMaster(cmd *Command, args []string) bool { *mMaxCpu = runtime.NumCPU() } runtime.GOMAXPROCS(*mMaxCpu) - topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) + var e error + if topo, e = topology.NewTopology("topo", *confFile, *metaFolder, "weed", + uint64(*volumeSizeLimitMB)*1024*1024, *mpulse); e != nil { + log.Fatalf("cannot create topology:%s", e) + } vg = replication.NewDefaultVolumeGrowth() log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") http.HandleFunc("/dir/assign", dirAssignHandler) @@ -209,9 +219,9 @@ func runMaster(cmd *Command, args []string) bool { Handler: http.DefaultServeMux, ReadTimeout: time.Duration(*mReadTimeout) * time.Second, } - e := srv.ListenAndServe() + e = srv.ListenAndServe() if e != nil { - log.Fatalf("Fail to start:%s", e.Error()) + log.Fatalf("Fail to start:%s", e) } return true } diff --git a/go/weed/shell.go b/go/weed/shell.go index daf0b7e1f..4287f2148 100644 --- a/go/weed/shell.go +++ b/go/weed/shell.go @@ -3,6 +3,7 @@ package main import ( "bufio" "fmt" + "log" "os" ) @@ -25,8 +26,13 @@ func runShell(command *Command, args []string) bool { o := bufio.NewWriter(os.Stdout) e := bufio.NewWriter(os.Stderr) prompt := func() { - o.WriteString("> ") - o.Flush() + var err error + if _, err = o.WriteString("> "); err != nil { + log.Printf("error writing to stdout: %s", err) + } + if err = o.Flush(); err != nil { + log.Printf("error flushing stdout: %s", err) + } } readLine := func() string { ret, err := r.ReadString('\n') @@ -38,7 +44,9 @@ func runShell(command *Command, args []string) bool { } execCmd := func(cmd string) int { if cmd != "" { - o.WriteString(cmd) + if _, err := o.WriteString(cmd); err != nil { + log.Printf("error writing to stdout: %s", err) + } } return 0 } diff --git a/go/weed/upload.go b/go/weed/upload.go index 92478b7b6..a47551ddf 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -1,14 +1,14 @@ package main import ( + "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" "fmt" "net/url" "os" "path" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/util" "strconv" ) diff --git a/go/weed/volume.go b/go/weed/volume.go index edf2ad821..fd2298541 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -2,13 +2,13 @@ package main import ( "bytes" + "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/storage" "log" "math/rand" "mime" "net/http" "os" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/storage" "runtime" "strconv" "strings" @@ -48,41 +48,41 @@ func statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = VERSION m["Volumes"] = store.Status() - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) if err == nil { - writeJson(w, r, map[string]string{"error": ""}) + writeJsonQuiet(w, r, map[string]string{"error": ""}) } else { - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) } func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) if err == nil { - writeJson(w, r, map[string]interface{}{"error": "", "result": ret}) + writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret}) } else { - writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false}) + writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false}) } debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) } func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { err := store.CompactVolume(r.FormValue("volume")) if err == nil { - writeJson(w, r, map[string]string{"error": ""}) + writeJsonQuiet(w, r, map[string]string{"error": ""}) } else { - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } debug("compacted volume =", r.FormValue("volume"), ", error =", err) } func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { err := store.CommitCompactVolume(r.FormValue("volume")) if err == nil { - writeJson(w, r, map[string]interface{}{"error": ""}) + writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) } else { - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } debug("commit compact volume =", r.FormValue("volume"), ", error =", err) } @@ -163,18 +163,29 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { } } w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) - w.Write(n.Data) + if _, e = w.Write(n.Data); e != nil { + debug("response write error:", e) + } } func PostHandler(w http.ResponseWriter, r *http.Request) { - r.ParseForm() + if e := r.ParseForm(); e != nil { + debug("form parse error:", e) + writeJsonQuiet(w, r, e) + return + } vid, _, _ := parseURLPath(r.URL.Path) volumeId, e := storage.NewVolumeId(vid) if e != nil { - writeJson(w, r, e) + debug("NewVolumeId error:", e) + writeJsonQuiet(w, r, e) + return + } + if e != nil { + writeJsonQuiet(w, r, e) } else { needle, filename, ne := storage.NewNeedle(r) if ne != nil { - writeJson(w, r, ne) + writeJsonQuiet(w, r, ne) } else { ret, err := store.Write(volumeId, needle) errorStatus := "" @@ -204,15 +215,19 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { if errorStatus == "" { w.WriteHeader(http.StatusCreated) } else { - store.Delete(volumeId, needle) - distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) + if _, e = store.Delete(volumeId, needle); e != nil { + errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + + strconv.FormatUint(uint64(volumeId), 10) + ": " + e.Error() + } else { + distributedOperation(volumeId, func(location operation.Location) bool { + return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + }) + } w.WriteHeader(http.StatusInternalServerError) m["error"] = errorStatus } m["size"] = ret - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } } } @@ -230,7 +245,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { if ok != nil { m := make(map[string]uint32) m["size"] = 0 - writeJson(w, r, m) + writeJsonQuiet(w, r, m) return } @@ -268,7 +283,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]uint32) m["size"] = uint32(count) - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func parseURLPath(path string) (vid, fid, ext string) { diff --git a/go/weed/weed.go b/go/weed/weed.go index c03cb68ac..e97c8b550 100644 --- a/go/weed/weed.go +++ b/go/weed/weed.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "io" + "log" "math/rand" "net/http" "os" @@ -173,22 +174,40 @@ func exitIfErrors() { exit() } } -func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { + +func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) { w.Header().Set("Content-Type", "application/javascript") var bytes []byte if r.FormValue("pretty") != "" { - bytes, _ = json.MarshalIndent(obj, "", " ") + bytes, err = json.MarshalIndent(obj, "", " ") } else { - bytes, _ = json.Marshal(obj) + bytes, err = json.Marshal(obj) + } + if err != nil { + return } callback := r.FormValue("callback") if callback == "" { - w.Write(bytes) + _, err = w.Write(bytes) } else { - w.Write([]uint8(callback)) - w.Write([]uint8("(")) + if _, err = w.Write([]uint8(callback)); err != nil { + return + } + if _, err = w.Write([]uint8("(")); err != nil { + return + } fmt.Fprint(w, string(bytes)) - w.Write([]uint8(")")) + if _, err = w.Write([]uint8(")")); err != nil { + return + } + } + return +} + +// wrapper for writeJson - just logs errors +func writeJsonQuiet(w http.ResponseWriter, r *http.Request, obj interface{}) { + if err := writeJson(w, r, obj); err != nil { + log.Printf("error writing JSON %s: %s", obj, err) } }