Browse Source

Merge pull request #838 from bingoohuang/master

go fmt and fix some typo
pull/842/head
Chris Lu 6 years ago
committed by GitHub
parent
commit
041d29eea3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      unmaintained/fix_dat/fix_dat.go
  2. 2
      unmaintained/repeated_vacuum/repeated_vacuum.go
  3. 2
      weed/command/backup.go
  4. 12
      weed/command/benchmark.go
  5. 4
      weed/command/filer_copy.go
  6. 2
      weed/command/filer_export.go
  7. 2
      weed/filer2/redis/universal_redis_store.go
  8. 4
      weed/filesys/dir_link.go
  9. 2
      weed/filesys/dir_rename.go
  10. 8
      weed/filesys/wfs.go
  11. 2
      weed/operation/submit.go
  12. 8
      weed/storage/needle.go
  13. 8
      weed/storage/replica_placement_test.go
  14. 3
      weed/storage/store.go
  15. 2
      weed/storage/volume.go
  16. 6
      weed/storage/volume_vacuum.go
  17. 4
      weed/topology/data_node.go
  18. 2
      weed/topology/node.go
  19. 2
      weed/topology/store_replicate.go
  20. 14
      weed/topology/topology_vacuum.go
  21. 12
      weed/topology/volume_growth.go
  22. 10
      weed/topology/volume_layout.go

6
unmaintained/fix_dat/fix_dat.go

@ -77,13 +77,13 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl
readerOffset += int64(count)
// start to read dat file
superblock, err := storage.ReadSuperBlock(datFile)
superBlock, err := storage.ReadSuperBlock(datFile)
if err != nil {
fmt.Printf("cannot read dat file super block: %v", err)
return
}
offset := int64(superblock.BlockSize())
version := superblock.Version()
offset := int64(superBlock.BlockSize())
version := superBlock.Version()
n, rest, err := storage.ReadNeedleHeader(datFile, version, offset)
if err != nil {
fmt.Printf("cannot read needle header: %v", err)

2
unmaintained/repeated_vacuum/repeated_vacuum.go

@ -29,7 +29,7 @@ func main() {
rand.Read(data)
reader := bytes.NewReader(data)
targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url,assignResult.Fid)
targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
_, err = operation.Upload(targetUrl, fmt.Sprintf("test%d", i), reader, false, "", nil, "")
if err != nil {

2
weed/command/backup.go

@ -38,7 +38,7 @@ var cmdBackup = &Command{
This will help to backup future new volumes.
Usually backing up is just copying the .dat (and .idx) files.
But it's tricky to incremententally copy the differences.
But it's tricky to incrementally copy the differences.
The complexity comes when there are multiple addition, deletion and compaction.
This tool will handle them correctly and efficiently, avoiding unnecessary data transporation.

12
weed/command/benchmark.go

@ -45,7 +45,7 @@ var (
)
func init() {
cmdBenchmark.Run = runbenchmark // break init cycle
cmdBenchmark.Run = runBenchmark // break init cycle
cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
@ -101,7 +101,7 @@ var (
readStats *stats
)
func runbenchmark(cmd *Command, args []string) bool {
func runBenchmark(cmd *Command, args []string) bool {
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
*b.maxCpu = runtime.NumCPU()
@ -121,17 +121,17 @@ func runbenchmark(cmd *Command, args []string) bool {
masterClient.WaitUntilConnected()
if *b.write {
bench_write()
benchWrite()
}
if *b.read {
bench_read()
benchRead()
}
return true
}
func bench_write() {
func benchWrite() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
writeStats = newStats(*b.concurrency)
@ -158,7 +158,7 @@ func bench_write() {
writeStats.printStats()
}
func bench_read() {
func benchRead() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
readStats = newStats(*b.concurrency)

4
weed/command/filer_copy.go

@ -315,7 +315,7 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string,
func detectMimeType(f *os.File) string {
head := make([]byte, 512)
f.Seek(0, 0)
f.Seek(0, io.SeekStart)
n, err := f.Read(head)
if err == io.EOF {
return ""
@ -324,7 +324,7 @@ func detectMimeType(f *os.File) string {
fmt.Printf("read head of %v: %v\n", f.Name(), err)
return "application/octet-stream"
}
f.Seek(0, 0)
f.Seek(0, io.SeekStart)
mimeType := http.DetectContentType(head[:n])
return mimeType
}

2
weed/command/filer_export.go

@ -14,7 +14,7 @@ func init() {
}
var cmdFilerExport = &Command{
UsageLine: "filer.export -sourceStore=mysql -targetStroe=cassandra",
UsageLine: "filer.export -sourceStore=mysql -targetStore=cassandra",
Short: "export meta data in filer store",
Long: `Iterate the file tree and export all metadata out

2
weed/filer2/redis/universal_redis_store.go

@ -25,7 +25,7 @@ func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
_, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec) * time.Second).Result()
_, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)

4
weed/filesys/dir_link.go

@ -6,10 +6,10 @@ import (
"syscall"
"time"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
var _ = fs.NodeSymlinker(&Dir{})

2
weed/filesys/dir_rename.go

@ -1,9 +1,9 @@
package filesys
import (
"context"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"context"
"math"
"path/filepath"

8
weed/filesys/wfs.go

@ -58,7 +58,7 @@ type statsCache struct {
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
option: option,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024*8).ItemsToPrune(100)),
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024 * 8).ItemsToPrune(100)),
pathToHandleIndex: make(map[string]int),
bufPool: sync.Pool{
New: func() interface{} {
@ -174,11 +174,11 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
resp.Blocks = totalDiskSize / blockSize
// Compute the number of used blocks
numblocks := uint64(usedDiskSize / blockSize)
numBlocks := uint64(usedDiskSize / blockSize)
// Report the number of free and available blocks for the block size
resp.Bfree = resp.Blocks - numblocks
resp.Bavail = resp.Blocks - numblocks
resp.Bfree = resp.Blocks - numBlocks
resp.Bavail = resp.Blocks - numBlocks
resp.Bsize = uint32(blockSize)
// Report the total number of possible files in the file system (and those free)

2
weed/operation/submit.go

@ -201,7 +201,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
}
func upload_one_chunk(filename string, reader io.Reader, master,
fileUrl string, jwt security.EncodedJwt,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,

8
weed/storage/needle.go

@ -29,12 +29,12 @@ type Needle struct {
DataSize uint32 `comment:"Data size"` //version2
Data []byte `comment:"The actual file data"`
Flags byte `comment:"boolean flags"` //version2
NameSize uint8 //version2
Flags byte `comment:"boolean flags"` //version2
NameSize uint8 //version2
Name []byte `comment:"maximum 256 characters"` //version2
MimeSize uint8 //version2
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2
PairsSize uint16 //version2
PairsSize uint16 //version2
Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"`
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
Ttl *TTL

8
weed/storage/replica_placement_test.go

@ -4,11 +4,11 @@ import (
"testing"
)
func TestReplicaPlacemnetSerialDeserial(t *testing.T) {
func TestReplicaPlacementSerialDeserial(t *testing.T) {
rp, _ := NewReplicaPlacementFromString("001")
new_rp, _ := NewReplicaPlacementFromByte(rp.Byte())
if rp.String() != new_rp.String() {
println("expected:", rp.String(), "actual:", new_rp.String())
newRp, _ := NewReplicaPlacementFromByte(rp.Byte())
if rp.String() != newRp.String() {
println("expected:", rp.String(), "actual:", newRp.String())
t.Fail()
}
}

3
weed/storage/store.go

@ -163,7 +163,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
volumeMessages = append(volumeMessages, volumeMessage)
} else {
if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
location.deleteVolumeById(v.Id)
glog.V(0).Infoln("volume", v.Id, "is deleted.")
} else {
@ -265,4 +265,3 @@ func (s *Store) DeleteVolume(i VolumeId) error {
return fmt.Errorf("Volume %d not found on disk", i)
}

2
weed/storage/volume.go

@ -120,7 +120,7 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
}
// wait either maxDelayMinutes or 10% of ttl minutes
func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}

6
weed/storage/volume_vacuum.go

@ -137,10 +137,10 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
incrementedHasUpdatedIndexEntry := make(map[NeedleId]keyField)
for idx_offset := indexSize - NeedleEntrySize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleEntrySize {
for idxOffset := indexSize - NeedleEntrySize; uint64(idxOffset) >= v.lastCompactIndexOffset; idxOffset -= NeedleEntrySize {
var IdxEntry []byte
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil {
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err)
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idxOffset); err != nil {
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idxOffset, err)
}
key, offset, size := IdxFileEntry(IdxEntry)
glog.V(4).Infof("key %d offset %d size %d", key, offset, size)

4
weed/topology/data_node.go

@ -86,9 +86,9 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) {
dn.RLock()
defer dn.RUnlock()
v_info, ok := dn.volumes[id]
vInfo, ok := dn.volumes[id]
if ok {
return v_info, nil
return vInfo, nil
} else {
return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
}

2
weed/topology/node.go

@ -103,7 +103,7 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d
}
if !ret {
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
err = errors.New("Not enough data node found!")
err = errors.New("No enough data node found!")
}
return
}

2
weed/topology/store_replicate.go

@ -31,7 +31,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
size = ret
return
}
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
if !needToReplicate {
needToReplicate = s.GetVolume(volumeId).NeedToReplicate()

14
weed/topology/topology_vacuum.go

@ -136,12 +136,12 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr
volumeLayout.accessLock.RLock()
tmpMap := make(map[storage.VolumeId]*VolumeLocationList)
for vid, locationlist := range volumeLayout.vid2location {
tmpMap[vid] = locationlist
for vid, locationList := range volumeLayout.vid2location {
tmpMap[vid] = locationList
}
volumeLayout.accessLock.RUnlock()
for vid, locationlist := range tmpMap {
for vid, locationList := range tmpMap {
volumeLayout.accessLock.RLock()
isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid]
@ -152,11 +152,11 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr
}
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist, preallocate) {
batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationList, preallocate) {
batchVacuumVolumeCommit(volumeLayout, vid, locationList)
} else {
batchVacuumVolumeCleanup(volumeLayout, vid, locationlist)
batchVacuumVolumeCleanup(volumeLayout, vid, locationList)
}
}
}

12
weed/topology/volume_growth.go

@ -126,7 +126,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
//find main rack and other racks
mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
@ -148,12 +148,12 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
return nil
})
if rack_err != nil {
return nil, rack_err
if rackErr != nil {
return nil, rackErr
}
//find main rack and other racks
mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
mainServer, otherServers, serverErr := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
@ -162,8 +162,8 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
return nil
})
if server_err != nil {
return nil, server_err
if serverErr != nil {
return nil, serverErr
}
servers = append(servers, mainServer.(*DataNode))

10
weed/topology/volume_layout.go

@ -55,8 +55,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id].Set(dn)
// glog.V(4).Infof("volume %d added to %s len %d copy %d", v.Id, dn.Id(), vl.vid2location[v.Id].Length(), v.ReplicaPlacement.GetCopyCount())
for _, dn := range vl.vid2location[v.Id].list {
if v_info, err := dn.GetVolumesById(v.Id); err == nil {
if v_info.ReadOnly {
if vInfo, err := dn.GetVolumesById(v.Id); err == nil {
if vInfo.ReadOnly {
glog.V(3).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
vl.readonlyVolumes[v.Id] = true
@ -145,13 +145,13 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
len_writers := len(vl.writables)
if len_writers <= 0 {
lenWriters := len(vl.writables)
if lenWriters <= 0 {
glog.V(0).Infoln("No more writable volumes!")
return nil, 0, nil, errors.New("No more writable volumes!")
}
if option.DataCenter == "" {
vid := vl.writables[rand.Intn(len_writers)]
vid := vl.writables[rand.Intn(lenWriters)]
locationList := vl.vid2location[vid]
if locationList != nil {
return &vid, count, locationList, nil

Loading…
Cancel
Save