709 changed files with 186629 additions and 2889 deletions
-
3.gitignore
-
4.travis.yml
-
2Dockerfile.go_build
-
25Makefile
-
6README.md
-
7go/operation/data_struts.go
-
203go/operation/system_message.pb.go
-
27go/proto/system_message.proto
-
113go/stats/stats.go
-
123go/storage/needle_map.go
-
389go/storage/store.go
-
65go/storage/volume_info.go
-
57go/topology/collection.go
-
100go/topology/data_node.go
-
126go/topology/store_replicate.go
-
189go/topology/topology.go
-
211go/topology/volume_growth.go
-
65go/topology/volume_location_list.go
-
38go/util/concurrent_read_map.go
-
163go/util/http_util.go
-
30go/weed/weed_server/master_server_handlers_ui.go
-
50go/weed/weed_server/volume_server_handlers_admin.go
-
86go/weed/weed_server/volume_server_handlers_sync.go
-
6unmaintained/fix_dat/fix_dat.go
-
4unmaintained/see_idx/see_idx.go
-
190weed/Godeps/Godeps.json
-
5weed/Godeps/Readme
-
0weed/compress/compression_test.go
-
0weed/compress/delta_binary_pack32.go
-
2weed/filer/cassandra_store/cassandra_store.go
-
0weed/filer/cassandra_store/schema.cql
-
4weed/filer/client_operations.go
-
0weed/filer/embedded_filer/design.txt
-
2weed/filer/embedded_filer/directory.go
-
4weed/filer/embedded_filer/directory_in_map.go
-
0weed/filer/embedded_filer/directory_test.go
-
4weed/filer/embedded_filer/filer_embedded.go
-
4weed/filer/embedded_filer/files_in_leveldb.go
-
0weed/filer/filer.go
-
2weed/filer/flat_namespace/flat_namespace_filer.go
-
2weed/filer/flat_namespace/flat_namespace_store.go
-
0weed/filer/redis_store/redis_store.go
-
0weed/glog/LICENSE
-
0weed/glog/README
-
0weed/glog/convenient_api.go
-
2weed/glog/glog.go
-
0weed/glog/glog_file.go
-
0weed/glog/glog_test.go
-
0weed/images/orientation.go
-
0weed/images/orientation_test.go
-
0weed/images/preprocess.go
-
6weed/images/resizing.go
-
29weed/images/resizing_test.go
-
0weed/images/sample1.jpg
-
7weed/main.go
-
6weed/operation/assign_file_id.go
-
61weed/operation/chunked_manifest.go
-
2weed/operation/compress.go
-
10weed/operation/delete_content.go
-
6weed/operation/list_masters.go
-
51weed/operation/lookup.go
-
14weed/operation/lookup_vid_cache.go
-
2weed/operation/lookup_vid_cache_test.go
-
10weed/operation/submit.go
-
7weed/operation/sync_volume.go
-
4weed/operation/upload_content.go
-
28weed/security/guard.go
-
2weed/security/jwt.go
-
0weed/sequence/memory_sequencer.go
-
0weed/sequence/sequence.go
-
0weed/stats/disk.go
-
0weed/stats/disk_notsupported.go
-
0weed/stats/disk_supported.go
-
48weed/stats/duration_counter.go
-
4weed/stats/duration_counter_test.go
-
0weed/stats/memory.go
-
0weed/stats/memory_notsupported.go
-
0weed/stats/memory_supported.go
-
65weed/stats/stats.go
-
133weed/storage/chunked_file_reader.go
-
117weed/storage/collection_settings.go
-
32weed/storage/collection_settings_test.go
-
25weed/storage/compact_map.go
-
6weed/storage/compact_map_perf_test.go
-
0weed/storage/compact_map_test.go
-
2weed/storage/crc.go
-
126weed/storage/disk_location.go
-
27weed/storage/file_id.go
-
76weed/storage/needle.go
-
134weed/storage/needle_map.go
-
6weed/storage/needle_map_boltdb.go
-
6weed/storage/needle_map_leveldb.go
-
26weed/storage/needle_map_memory.go
-
16weed/storage/needle_read_write.go
-
14weed/storage/replica_placement.go
-
0weed/storage/replica_placement_test.go
-
450weed/storage/store.go
-
160weed/storage/store_task.go
-
87weed/storage/store_task_cli.go
-
115weed/storage/store_task_replication.go
@ -1,5 +1,5 @@ |
|||
FROM cydev/go |
|||
RUN go get github.com/chrislusf/seaweedfs/go/weed |
|||
RUN go get github.com/chrislusf/seaweedfs/weed |
|||
EXPOSE 8080 |
|||
EXPOSE 9333 |
|||
VOLUME /data |
|||
@ -1,22 +1,27 @@ |
|||
BINARY = weed |
|||
OUT_DIR = bin |
|||
|
|||
GO_FLAGS = #-v |
|||
SOURCE_DIR = ./go/weed/ |
|||
GO_FLAGS = #-race -v |
|||
SOURCE_DIR = ./weed |
|||
|
|||
all: build |
|||
|
|||
.PHONY : clean deps build linux |
|||
.PHONY : clean godep build linux vet |
|||
|
|||
clean: |
|||
go clean -i $(GO_FLAGS) $(SOURCE_DIR) |
|||
rm -f $(BINARY) |
|||
|
|||
deps: |
|||
go get $(GO_FLAGS) -d $(SOURCE_DIR) |
|||
fmt: |
|||
gofmt -w -s $(SOURCE_DIR) |
|||
|
|||
build: deps |
|||
go build $(GO_FLAGS) -o $(BINARY) $(SOURCE_DIR) |
|||
vet: |
|||
go vet $(SOURCE_DIR)/... |
|||
|
|||
linux: deps |
|||
mkdir -p linux |
|||
GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -o linux/$(BINARY) $(SOURCE_DIR) |
|||
build: fmt |
|||
mkdir -p $(OUT_DIR) |
|||
go build $(GO_FLAGS) -o $(OUT_DIR)/$(BINARY) $(SOURCE_DIR) |
|||
|
|||
linux: |
|||
mkdir -p $(OUT_DIR)/linux-amd64 |
|||
GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -o $(OUT_DIR)/linux-amd64/$(BINARY) $(SOURCE_DIR) |
|||
@ -1,7 +0,0 @@ |
|||
package operation |
|||
|
|||
type JoinResult struct { |
|||
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` |
|||
SecretKey string `json:"secretKey,omitempty"` |
|||
Error string `json:"error,omitempty"` |
|||
} |
|||
@ -1,203 +0,0 @@ |
|||
// Code generated by protoc-gen-go.
|
|||
// source: system_message.proto
|
|||
// DO NOT EDIT!
|
|||
|
|||
/* |
|||
Package operation is a generated protocol buffer package. |
|||
|
|||
It is generated from these files: |
|||
system_message.proto |
|||
|
|||
It has these top-level messages: |
|||
VolumeInformationMessage |
|||
JoinMessage |
|||
*/ |
|||
package operation |
|||
|
|||
import proto "github.com/golang/protobuf/proto" |
|||
import math "math" |
|||
|
|||
// Reference imports to suppress errors if they are not otherwise used.
|
|||
var _ = proto.Marshal |
|||
var _ = math.Inf |
|||
|
|||
type VolumeInformationMessage struct { |
|||
Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"` |
|||
Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"` |
|||
Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` |
|||
FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"` |
|||
DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"` |
|||
DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"` |
|||
ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` |
|||
ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"` |
|||
Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` |
|||
Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` |
|||
XXX_unrecognized []byte `json:"-"` |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} } |
|||
func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) } |
|||
func (*VolumeInformationMessage) ProtoMessage() {} |
|||
|
|||
const Default_VolumeInformationMessage_Version uint32 = 2 |
|||
|
|||
func (m *VolumeInformationMessage) GetId() uint32 { |
|||
if m != nil && m.Id != nil { |
|||
return *m.Id |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) GetSize() uint64 { |
|||
if m != nil && m.Size != nil { |
|||
return *m.Size |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) GetCollection() string { |
|||
if m != nil && m.Collection != nil { |
|||
return *m.Collection |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) GetFileCount() uint64 { |
|||
if m != nil && m.FileCount != nil { |
|||
return *m.FileCount |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) GetDeleteCount() uint64 { |
|||
if m != nil && m.DeleteCount != nil { |
|||
return *m.DeleteCount |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 { |
|||
if m != nil && m.DeletedByteCount != nil { |
|||
return *m.DeletedByteCount |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) GetReadOnly() bool { |
|||
if m != nil && m.ReadOnly != nil { |
|||
return *m.ReadOnly |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 { |
|||
if m != nil && m.ReplicaPlacement != nil { |
|||
return *m.ReplicaPlacement |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) GetVersion() uint32 { |
|||
if m != nil && m.Version != nil { |
|||
return *m.Version |
|||
} |
|||
return Default_VolumeInformationMessage_Version |
|||
} |
|||
|
|||
func (m *VolumeInformationMessage) GetTtl() uint32 { |
|||
if m != nil && m.Ttl != nil { |
|||
return *m.Ttl |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
type JoinMessage struct { |
|||
IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"` |
|||
Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"` |
|||
Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"` |
|||
PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"` |
|||
MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"` |
|||
MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"` |
|||
DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"` |
|||
Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"` |
|||
Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"` |
|||
AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"` |
|||
XXX_unrecognized []byte `json:"-"` |
|||
} |
|||
|
|||
func (m *JoinMessage) Reset() { *m = JoinMessage{} } |
|||
func (m *JoinMessage) String() string { return proto.CompactTextString(m) } |
|||
func (*JoinMessage) ProtoMessage() {} |
|||
|
|||
func (m *JoinMessage) GetIsInit() bool { |
|||
if m != nil && m.IsInit != nil { |
|||
return *m.IsInit |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func (m *JoinMessage) GetIp() string { |
|||
if m != nil && m.Ip != nil { |
|||
return *m.Ip |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
func (m *JoinMessage) GetPort() uint32 { |
|||
if m != nil && m.Port != nil { |
|||
return *m.Port |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (m *JoinMessage) GetPublicUrl() string { |
|||
if m != nil && m.PublicUrl != nil { |
|||
return *m.PublicUrl |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
func (m *JoinMessage) GetMaxVolumeCount() uint32 { |
|||
if m != nil && m.MaxVolumeCount != nil { |
|||
return *m.MaxVolumeCount |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (m *JoinMessage) GetMaxFileKey() uint64 { |
|||
if m != nil && m.MaxFileKey != nil { |
|||
return *m.MaxFileKey |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (m *JoinMessage) GetDataCenter() string { |
|||
if m != nil && m.DataCenter != nil { |
|||
return *m.DataCenter |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
func (m *JoinMessage) GetRack() string { |
|||
if m != nil && m.Rack != nil { |
|||
return *m.Rack |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage { |
|||
if m != nil { |
|||
return m.Volumes |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (m *JoinMessage) GetAdminPort() uint32 { |
|||
if m != nil && m.AdminPort != nil { |
|||
return *m.AdminPort |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func init() { |
|||
} |
|||
@ -1,27 +0,0 @@ |
|||
package operation; |
|||
|
|||
message VolumeInformationMessage { |
|||
required uint32 id = 1; |
|||
required uint64 size = 2; |
|||
optional string collection = 3; |
|||
required uint64 file_count = 4; |
|||
required uint64 delete_count = 5; |
|||
required uint64 deleted_byte_count = 6; |
|||
optional bool read_only = 7; |
|||
required uint32 replica_placement = 8; |
|||
optional uint32 version = 9 [default=2]; |
|||
optional uint32 ttl = 10; |
|||
} |
|||
|
|||
message JoinMessage { |
|||
optional bool is_init = 1; |
|||
required string ip = 2; |
|||
required uint32 port = 3; |
|||
optional string public_url = 4; |
|||
required uint32 max_volume_count = 5; |
|||
required uint64 max_file_key = 6; |
|||
optional string data_center = 7; |
|||
optional string rack = 8; |
|||
repeated VolumeInformationMessage volumes = 9; |
|||
optional uint32 admin_port = 10; |
|||
} |
|||
@ -1,113 +0,0 @@ |
|||
package stats |
|||
|
|||
import ( |
|||
"time" |
|||
) |
|||
|
|||
type ServerStats struct { |
|||
Requests *DurationCounter |
|||
Connections *DurationCounter |
|||
AssignRequests *DurationCounter |
|||
ReadRequests *DurationCounter |
|||
WriteRequests *DurationCounter |
|||
DeleteRequests *DurationCounter |
|||
BytesIn *DurationCounter |
|||
BytesOut *DurationCounter |
|||
} |
|||
|
|||
type Channels struct { |
|||
Connections chan *TimedValue |
|||
Requests chan *TimedValue |
|||
AssignRequests chan *TimedValue |
|||
ReadRequests chan *TimedValue |
|||
WriteRequests chan *TimedValue |
|||
DeleteRequests chan *TimedValue |
|||
BytesIn chan *TimedValue |
|||
BytesOut chan *TimedValue |
|||
} |
|||
|
|||
var ( |
|||
Chan *Channels |
|||
) |
|||
|
|||
func init() { |
|||
Chan = &Channels{ |
|||
Connections: make(chan *TimedValue, 100), |
|||
Requests: make(chan *TimedValue, 100), |
|||
AssignRequests: make(chan *TimedValue, 100), |
|||
ReadRequests: make(chan *TimedValue, 100), |
|||
WriteRequests: make(chan *TimedValue, 100), |
|||
DeleteRequests: make(chan *TimedValue, 100), |
|||
BytesIn: make(chan *TimedValue, 100), |
|||
BytesOut: make(chan *TimedValue, 100), |
|||
} |
|||
} |
|||
|
|||
func NewServerStats() *ServerStats { |
|||
return &ServerStats{ |
|||
Requests: NewDurationCounter(), |
|||
Connections: NewDurationCounter(), |
|||
AssignRequests: NewDurationCounter(), |
|||
ReadRequests: NewDurationCounter(), |
|||
WriteRequests: NewDurationCounter(), |
|||
DeleteRequests: NewDurationCounter(), |
|||
BytesIn: NewDurationCounter(), |
|||
BytesOut: NewDurationCounter(), |
|||
} |
|||
} |
|||
|
|||
func ConnectionOpen() { |
|||
Chan.Connections <- NewTimedValue(time.Now(), 1) |
|||
} |
|||
func ConnectionClose() { |
|||
Chan.Connections <- NewTimedValue(time.Now(), -1) |
|||
} |
|||
func RequestOpen() { |
|||
Chan.Requests <- NewTimedValue(time.Now(), 1) |
|||
} |
|||
func RequestClose() { |
|||
Chan.Requests <- NewTimedValue(time.Now(), -1) |
|||
} |
|||
func AssignRequest() { |
|||
Chan.AssignRequests <- NewTimedValue(time.Now(), 1) |
|||
} |
|||
func ReadRequest() { |
|||
Chan.ReadRequests <- NewTimedValue(time.Now(), 1) |
|||
} |
|||
func WriteRequest() { |
|||
Chan.WriteRequests <- NewTimedValue(time.Now(), 1) |
|||
} |
|||
func DeleteRequest() { |
|||
Chan.DeleteRequests <- NewTimedValue(time.Now(), 1) |
|||
} |
|||
func BytesIn(val int64) { |
|||
Chan.BytesIn <- NewTimedValue(time.Now(), val) |
|||
} |
|||
func BytesOut(val int64) { |
|||
Chan.BytesOut <- NewTimedValue(time.Now(), val) |
|||
} |
|||
|
|||
func (ss *ServerStats) Start() { |
|||
for { |
|||
select { |
|||
case tv := <-Chan.Connections: |
|||
ss.Connections.Add(tv) |
|||
case tv := <-Chan.Requests: |
|||
ss.Requests.Add(tv) |
|||
case tv := <-Chan.AssignRequests: |
|||
ss.AssignRequests.Add(tv) |
|||
case tv := <-Chan.ReadRequests: |
|||
ss.ReadRequests.Add(tv) |
|||
case tv := <-Chan.WriteRequests: |
|||
ss.WriteRequests.Add(tv) |
|||
case tv := <-Chan.ReadRequests: |
|||
ss.ReadRequests.Add(tv) |
|||
case tv := <-Chan.DeleteRequests: |
|||
ss.DeleteRequests.Add(tv) |
|||
case tv := <-Chan.BytesIn: |
|||
ss.BytesIn.Add(tv) |
|||
case tv := <-Chan.BytesOut: |
|||
ss.BytesOut.Add(tv) |
|||
} |
|||
} |
|||
} |
|||
@ -1,123 +0,0 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"fmt" |
|||
"io/ioutil" |
|||
"os" |
|||
"sync" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
) |
|||
|
|||
type NeedleMapType int |
|||
|
|||
const ( |
|||
NeedleMapInMemory NeedleMapType = iota |
|||
NeedleMapLevelDb |
|||
NeedleMapBoltDb |
|||
) |
|||
|
|||
type NeedleMapper interface { |
|||
Put(key uint64, offset uint32, size uint32) error |
|||
Get(key uint64) (element *NeedleValue, ok bool) |
|||
Delete(key uint64) error |
|||
Close() |
|||
Destroy() error |
|||
ContentSize() uint64 |
|||
DeletedSize() uint64 |
|||
FileCount() int |
|||
DeletedCount() int |
|||
MaxFileKey() uint64 |
|||
IndexFileSize() uint64 |
|||
IndexFileContent() ([]byte, error) |
|||
IndexFileName() string |
|||
} |
|||
|
|||
type baseNeedleMapper struct { |
|||
indexFile *os.File |
|||
indexFileAccessLock sync.Mutex |
|||
|
|||
mapMetric |
|||
} |
|||
|
|||
func (nm baseNeedleMapper) IndexFileSize() uint64 { |
|||
stat, err := nm.indexFile.Stat() |
|||
if err == nil { |
|||
return uint64(stat.Size()) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (nm baseNeedleMapper) IndexFileName() string { |
|||
return nm.indexFile.Name() |
|||
} |
|||
|
|||
func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) { |
|||
key = util.BytesToUint64(bytes[:8]) |
|||
offset = util.BytesToUint32(bytes[8:12]) |
|||
size = util.BytesToUint32(bytes[12:16]) |
|||
return |
|||
} |
|||
func (nm baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error { |
|||
bytes := make([]byte, 16) |
|||
util.Uint64toBytes(bytes[0:8], key) |
|||
util.Uint32toBytes(bytes[8:12], offset) |
|||
util.Uint32toBytes(bytes[12:16], size) |
|||
|
|||
nm.indexFileAccessLock.Lock() |
|||
defer nm.indexFileAccessLock.Unlock() |
|||
if _, err := nm.indexFile.Seek(0, 2); err != nil { |
|||
return fmt.Errorf("cannot seek end of indexfile %s: %v", |
|||
nm.indexFile.Name(), err) |
|||
} |
|||
_, err := nm.indexFile.Write(bytes) |
|||
return err |
|||
} |
|||
func (nm baseNeedleMapper) IndexFileContent() ([]byte, error) { |
|||
nm.indexFileAccessLock.Lock() |
|||
defer nm.indexFileAccessLock.Unlock() |
|||
return ioutil.ReadFile(nm.indexFile.Name()) |
|||
} |
|||
|
|||
type mapMetric struct { |
|||
indexFile *os.File |
|||
|
|||
DeletionCounter int `json:"DeletionCounter"` |
|||
FileCounter int `json:"FileCounter"` |
|||
DeletionByteCounter uint64 `json:"DeletionByteCounter"` |
|||
FileByteCounter uint64 `json:"FileByteCounter"` |
|||
MaximumFileKey uint64 `json:"MaxFileKey"` |
|||
} |
|||
|
|||
func (mm *mapMetric) logDelete(deletedByteCount uint32) { |
|||
mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) |
|||
mm.DeletionCounter++ |
|||
} |
|||
|
|||
func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { |
|||
if key > mm.MaximumFileKey { |
|||
mm.MaximumFileKey = key |
|||
} |
|||
mm.FileCounter++ |
|||
mm.FileByteCounter = mm.FileByteCounter + uint64(newSize) |
|||
if oldSize > 0 { |
|||
mm.DeletionCounter++ |
|||
mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) |
|||
} |
|||
} |
|||
|
|||
func (mm mapMetric) ContentSize() uint64 { |
|||
return mm.FileByteCounter |
|||
} |
|||
func (mm mapMetric) DeletedSize() uint64 { |
|||
return mm.DeletionByteCounter |
|||
} |
|||
func (mm mapMetric) FileCount() int { |
|||
return mm.FileCounter |
|||
} |
|||
func (mm mapMetric) DeletedCount() int { |
|||
return mm.DeletionCounter |
|||
} |
|||
func (mm mapMetric) MaxFileKey() uint64 { |
|||
return mm.MaximumFileKey |
|||
} |
|||
@ -1,389 +0,0 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"errors" |
|||
"fmt" |
|||
"io/ioutil" |
|||
"math/rand" |
|||
"strconv" |
|||
"strings" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/glog" |
|||
"github.com/chrislusf/seaweedfs/go/operation" |
|||
"github.com/chrislusf/seaweedfs/go/security" |
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
"github.com/golang/protobuf/proto" |
|||
) |
|||
|
|||
const ( |
|||
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
|
|||
) |
|||
|
|||
type DiskLocation struct { |
|||
Directory string |
|||
MaxVolumeCount int |
|||
volumes map[VolumeId]*Volume |
|||
} |
|||
|
|||
func (mn *DiskLocation) reset() { |
|||
} |
|||
|
|||
type MasterNodes struct { |
|||
nodes []string |
|||
lastNode int |
|||
} |
|||
|
|||
func (mn *MasterNodes) String() string { |
|||
return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode) |
|||
} |
|||
|
|||
func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) { |
|||
mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} |
|||
return |
|||
} |
|||
func (mn *MasterNodes) reset() { |
|||
glog.V(4).Infof("Resetting master nodes: %v", mn) |
|||
if len(mn.nodes) > 1 && mn.lastNode >= 0 { |
|||
glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes) |
|||
mn.lastNode = -mn.lastNode - 1 |
|||
} |
|||
} |
|||
func (mn *MasterNodes) findMaster() (string, error) { |
|||
if len(mn.nodes) == 0 { |
|||
return "", errors.New("No master node found!") |
|||
} |
|||
if mn.lastNode < 0 { |
|||
for _, m := range mn.nodes { |
|||
glog.V(4).Infof("Listing masters on %s", m) |
|||
if masters, e := operation.ListMasters(m); e == nil { |
|||
if len(masters) == 0 { |
|||
continue |
|||
} |
|||
mn.nodes = append(masters, m) |
|||
mn.lastNode = rand.Intn(len(mn.nodes)) |
|||
glog.V(2).Infof("current master nodes is %v", mn) |
|||
break |
|||
} else { |
|||
glog.V(4).Infof("Failed listing masters on %s: %v", m, e) |
|||
} |
|||
} |
|||
} |
|||
if mn.lastNode < 0 { |
|||
return "", errors.New("No master node available!") |
|||
} |
|||
return mn.nodes[mn.lastNode], nil |
|||
} |
|||
|
|||
/* |
|||
* A VolumeServer contains one Store |
|||
*/ |
|||
type Store struct { |
|||
Ip string |
|||
Port int |
|||
PublicUrl string |
|||
Locations []*DiskLocation |
|||
dataCenter string //optional informaton, overwriting master setting if exists
|
|||
rack string //optional information, overwriting master setting if exists
|
|||
connected bool |
|||
volumeSizeLimit uint64 //read from the master
|
|||
masterNodes *MasterNodes |
|||
} |
|||
|
|||
func (s *Store) String() (str string) { |
|||
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes) |
|||
return |
|||
} |
|||
|
|||
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { |
|||
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} |
|||
s.Locations = make([]*DiskLocation, 0) |
|||
for i := 0; i < len(dirnames); i++ { |
|||
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} |
|||
location.volumes = make(map[VolumeId]*Volume) |
|||
location.loadExistingVolumes(needleMapKind) |
|||
s.Locations = append(s.Locations, location) |
|||
} |
|||
return |
|||
} |
|||
func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error { |
|||
rt, e := NewReplicaPlacementFromString(replicaPlacement) |
|||
if e != nil { |
|||
return e |
|||
} |
|||
ttl, e := ReadTTL(ttlString) |
|||
if e != nil { |
|||
return e |
|||
} |
|||
for _, range_string := range strings.Split(volumeListString, ",") { |
|||
if strings.Index(range_string, "-") < 0 { |
|||
id_string := range_string |
|||
id, err := NewVolumeId(id_string) |
|||
if err != nil { |
|||
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) |
|||
} |
|||
e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl) |
|||
} else { |
|||
pair := strings.Split(range_string, "-") |
|||
start, start_err := strconv.ParseUint(pair[0], 10, 64) |
|||
if start_err != nil { |
|||
return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0]) |
|||
} |
|||
end, end_err := strconv.ParseUint(pair[1], 10, 64) |
|||
if end_err != nil { |
|||
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) |
|||
} |
|||
for id := start; id <= end; id++ { |
|||
if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil { |
|||
e = err |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return e |
|||
} |
|||
func (s *Store) DeleteCollection(collection string) (e error) { |
|||
for _, location := range s.Locations { |
|||
for k, v := range location.volumes { |
|||
if v.Collection == collection { |
|||
e = v.Destroy() |
|||
if e != nil { |
|||
return |
|||
} |
|||
delete(location.volumes, k) |
|||
} |
|||
} |
|||
} |
|||
return |
|||
} |
|||
func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) { |
|||
e = v.Destroy() |
|||
if e != nil { |
|||
return |
|||
} |
|||
delete(volumes, v.Id) |
|||
return |
|||
} |
|||
func (s *Store) findVolume(vid VolumeId) *Volume { |
|||
for _, location := range s.Locations { |
|||
if v, found := location.volumes[vid]; found { |
|||
return v |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
func (s *Store) findFreeLocation() (ret *DiskLocation) { |
|||
max := 0 |
|||
for _, location := range s.Locations { |
|||
currentFreeCount := location.MaxVolumeCount - len(location.volumes) |
|||
if currentFreeCount > max { |
|||
max = currentFreeCount |
|||
ret = location |
|||
} |
|||
} |
|||
return ret |
|||
} |
|||
func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error { |
|||
if s.findVolume(vid) != nil { |
|||
return fmt.Errorf("Volume Id %d already exists!", vid) |
|||
} |
|||
if location := s.findFreeLocation(); location != nil { |
|||
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", |
|||
location.Directory, vid, collection, replicaPlacement, ttl) |
|||
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { |
|||
location.volumes[vid] = volume |
|||
return nil |
|||
} else { |
|||
return err |
|||
} |
|||
} |
|||
return fmt.Errorf("No more free space left") |
|||
} |
|||
|
|||
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { |
|||
if dirs, err := ioutil.ReadDir(l.Directory); err == nil { |
|||
for _, dir := range dirs { |
|||
name := dir.Name() |
|||
if !dir.IsDir() && strings.HasSuffix(name, ".dat") { |
|||
collection := "" |
|||
base := name[:len(name)-len(".dat")] |
|||
i := strings.LastIndex(base, "_") |
|||
if i > 0 { |
|||
collection, base = base[0:i], base[i+1:] |
|||
} |
|||
if vid, err := NewVolumeId(base); err == nil { |
|||
if l.volumes[vid] == nil { |
|||
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { |
|||
l.volumes[vid] = v |
|||
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) |
|||
} else { |
|||
glog.V(0).Infof("new volume %s error %s", name, e) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount) |
|||
} |
|||
func (s *Store) Status() []*VolumeInfo { |
|||
var stats []*VolumeInfo |
|||
for _, location := range s.Locations { |
|||
for k, v := range location.volumes { |
|||
s := &VolumeInfo{ |
|||
Id: VolumeId(k), |
|||
Size: v.ContentSize(), |
|||
Collection: v.Collection, |
|||
ReplicaPlacement: v.ReplicaPlacement, |
|||
Version: v.Version(), |
|||
FileCount: v.nm.FileCount(), |
|||
DeleteCount: v.nm.DeletedCount(), |
|||
DeletedByteCount: v.nm.DeletedSize(), |
|||
ReadOnly: v.readOnly, |
|||
Ttl: v.Ttl} |
|||
stats = append(stats, s) |
|||
} |
|||
} |
|||
sortVolumeInfos(stats) |
|||
return stats |
|||
} |
|||
|
|||
func (s *Store) SetDataCenter(dataCenter string) { |
|||
s.dataCenter = dataCenter |
|||
} |
|||
func (s *Store) SetRack(rack string) { |
|||
s.rack = rack |
|||
} |
|||
|
|||
func (s *Store) SetBootstrapMaster(bootstrapMaster string) { |
|||
s.masterNodes = NewMasterNodes(bootstrapMaster) |
|||
} |
|||
func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { |
|||
masterNode, e = s.masterNodes.findMaster() |
|||
if e != nil { |
|||
return |
|||
} |
|||
var volumeMessages []*operation.VolumeInformationMessage |
|||
maxVolumeCount := 0 |
|||
var maxFileKey uint64 |
|||
for _, location := range s.Locations { |
|||
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount |
|||
for k, v := range location.volumes { |
|||
if maxFileKey < v.nm.MaxFileKey() { |
|||
maxFileKey = v.nm.MaxFileKey() |
|||
} |
|||
if !v.expired(s.volumeSizeLimit) { |
|||
volumeMessage := &operation.VolumeInformationMessage{ |
|||
Id: proto.Uint32(uint32(k)), |
|||
Size: proto.Uint64(uint64(v.Size())), |
|||
Collection: proto.String(v.Collection), |
|||
FileCount: proto.Uint64(uint64(v.nm.FileCount())), |
|||
DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), |
|||
DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), |
|||
ReadOnly: proto.Bool(v.readOnly), |
|||
ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), |
|||
Version: proto.Uint32(uint32(v.Version())), |
|||
Ttl: proto.Uint32(v.Ttl.ToUint32()), |
|||
} |
|||
volumeMessages = append(volumeMessages, volumeMessage) |
|||
} else { |
|||
if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { |
|||
s.DeleteVolume(location.volumes, v) |
|||
glog.V(0).Infoln("volume", v.Id, "is deleted.") |
|||
} else { |
|||
glog.V(0).Infoln("volume", v.Id, "is expired.") |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
joinMessage := &operation.JoinMessage{ |
|||
IsInit: proto.Bool(!s.connected), |
|||
Ip: proto.String(s.Ip), |
|||
Port: proto.Uint32(uint32(s.Port)), |
|||
PublicUrl: proto.String(s.PublicUrl), |
|||
MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)), |
|||
MaxFileKey: proto.Uint64(maxFileKey), |
|||
DataCenter: proto.String(s.dataCenter), |
|||
Rack: proto.String(s.rack), |
|||
Volumes: volumeMessages, |
|||
} |
|||
|
|||
data, err := proto.Marshal(joinMessage) |
|||
if err != nil { |
|||
return "", "", err |
|||
} |
|||
|
|||
joinUrl := "http://" + masterNode + "/dir/join" |
|||
glog.V(4).Infof("Connecting to %s ...", joinUrl) |
|||
|
|||
jsonBlob, err := util.PostBytes(joinUrl, data) |
|||
if err != nil { |
|||
s.masterNodes.reset() |
|||
return "", "", err |
|||
} |
|||
var ret operation.JoinResult |
|||
if err := json.Unmarshal(jsonBlob, &ret); err != nil { |
|||
glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) |
|||
s.masterNodes.reset() |
|||
return masterNode, "", err |
|||
} |
|||
if ret.Error != "" { |
|||
s.masterNodes.reset() |
|||
return masterNode, "", errors.New(ret.Error) |
|||
} |
|||
s.volumeSizeLimit = ret.VolumeSizeLimit |
|||
secretKey = security.Secret(ret.SecretKey) |
|||
s.connected = true |
|||
return |
|||
} |
|||
func (s *Store) Close() { |
|||
for _, location := range s.Locations { |
|||
for _, v := range location.volumes { |
|||
v.Close() |
|||
} |
|||
} |
|||
} |
|||
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { |
|||
if v := s.findVolume(i); v != nil { |
|||
if v.readOnly { |
|||
err = fmt.Errorf("Volume %d is read only", i) |
|||
return |
|||
} |
|||
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { |
|||
size, err = v.write(n) |
|||
} else { |
|||
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize()) |
|||
} |
|||
if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { |
|||
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) |
|||
if _, _, e := s.SendHeartbeatToMaster(); e != nil { |
|||
glog.V(0).Infoln("error when reporting size:", e) |
|||
} |
|||
} |
|||
return |
|||
} |
|||
glog.V(0).Infoln("volume", i, "not found!") |
|||
err = fmt.Errorf("Volume %d not found!", i) |
|||
return |
|||
} |
|||
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { |
|||
if v := s.findVolume(i); v != nil && !v.readOnly { |
|||
return v.delete(n) |
|||
} |
|||
return 0, nil |
|||
} |
|||
func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { |
|||
if v := s.findVolume(i); v != nil { |
|||
return v.readNeedle(n) |
|||
} |
|||
return 0, fmt.Errorf("Volume %v not found!", i) |
|||
} |
|||
func (s *Store) GetVolume(i VolumeId) *Volume { |
|||
return s.findVolume(i) |
|||
} |
|||
|
|||
func (s *Store) HasVolume(i VolumeId) bool { |
|||
v := s.findVolume(i) |
|||
return v != nil |
|||
} |
|||
@ -1,65 +0,0 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/go/operation" |
|||
"sort" |
|||
) |
|||
|
|||
type VolumeInfo struct { |
|||
Id VolumeId |
|||
Size uint64 |
|||
ReplicaPlacement *ReplicaPlacement |
|||
Ttl *TTL |
|||
Collection string |
|||
Version Version |
|||
FileCount int |
|||
DeleteCount int |
|||
DeletedByteCount uint64 |
|||
ReadOnly bool |
|||
} |
|||
|
|||
func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) { |
|||
vi = VolumeInfo{ |
|||
Id: VolumeId(*m.Id), |
|||
Size: *m.Size, |
|||
Collection: *m.Collection, |
|||
FileCount: int(*m.FileCount), |
|||
DeleteCount: int(*m.DeleteCount), |
|||
DeletedByteCount: *m.DeletedByteCount, |
|||
ReadOnly: *m.ReadOnly, |
|||
Version: Version(*m.Version), |
|||
} |
|||
rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement)) |
|||
if e != nil { |
|||
return vi, e |
|||
} |
|||
vi.ReplicaPlacement = rp |
|||
vi.Ttl = LoadTTLFromUint32(*m.Ttl) |
|||
return vi, nil |
|||
} |
|||
|
|||
func (vi VolumeInfo) String() string { |
|||
return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", |
|||
vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) |
|||
} |
|||
|
|||
/*VolumesInfo sorting*/ |
|||
|
|||
type volumeInfos []*VolumeInfo |
|||
|
|||
func (vis volumeInfos) Len() int { |
|||
return len(vis) |
|||
} |
|||
|
|||
func (vis volumeInfos) Less(i, j int) bool { |
|||
return vis[i].Id < vis[j].Id |
|||
} |
|||
|
|||
func (vis volumeInfos) Swap(i, j int) { |
|||
vis[i], vis[j] = vis[j], vis[i] |
|||
} |
|||
|
|||
func sortVolumeInfos(vis volumeInfos) { |
|||
sort.Sort(vis) |
|||
} |
|||
@ -1,57 +0,0 @@ |
|||
package topology |
|||
|
|||
import ( |
|||
"fmt" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/storage" |
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
) |
|||
|
|||
type Collection struct { |
|||
Name string |
|||
volumeSizeLimit uint64 |
|||
storageType2VolumeLayout *util.ConcurrentReadMap |
|||
} |
|||
|
|||
func NewCollection(name string, volumeSizeLimit uint64) *Collection { |
|||
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} |
|||
c.storageType2VolumeLayout = util.NewConcurrentReadMap() |
|||
return c |
|||
} |
|||
|
|||
func (c *Collection) String() string { |
|||
return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) |
|||
} |
|||
|
|||
func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { |
|||
keyString := rp.String() |
|||
if ttl != nil { |
|||
keyString += ttl.String() |
|||
} |
|||
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { |
|||
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) |
|||
}) |
|||
return vl.(*VolumeLayout) |
|||
} |
|||
|
|||
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { |
|||
for _, vl := range c.storageType2VolumeLayout.Items { |
|||
if vl != nil { |
|||
if list := vl.(*VolumeLayout).Lookup(vid); list != nil { |
|||
return list |
|||
} |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (c *Collection) ListVolumeServers() (nodes []*DataNode) { |
|||
for _, vl := range c.storageType2VolumeLayout.Items { |
|||
if vl != nil { |
|||
if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil { |
|||
nodes = append(nodes, list...) |
|||
} |
|||
} |
|||
} |
|||
return |
|||
} |
|||
@ -1,100 +0,0 @@ |
|||
package topology |
|||
|
|||
import ( |
|||
"fmt" |
|||
"strconv" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/glog" |
|||
"github.com/chrislusf/seaweedfs/go/storage" |
|||
) |
|||
|
|||
type DataNode struct { |
|||
NodeImpl |
|||
volumes map[storage.VolumeId]storage.VolumeInfo |
|||
Ip string |
|||
Port int |
|||
PublicUrl string |
|||
LastSeen int64 // unix time in seconds
|
|||
Dead bool |
|||
} |
|||
|
|||
func NewDataNode(id string) *DataNode { |
|||
s := &DataNode{} |
|||
s.id = NodeId(id) |
|||
s.nodeType = "DataNode" |
|||
s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) |
|||
s.NodeImpl.value = s |
|||
return s |
|||
} |
|||
|
|||
func (dn *DataNode) String() string { |
|||
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) |
|||
} |
|||
|
|||
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { |
|||
if _, ok := dn.volumes[v.Id]; !ok { |
|||
dn.volumes[v.Id] = v |
|||
dn.UpAdjustVolumeCountDelta(1) |
|||
if !v.ReadOnly { |
|||
dn.UpAdjustActiveVolumeCountDelta(1) |
|||
} |
|||
dn.UpAdjustMaxVolumeId(v.Id) |
|||
} else { |
|||
dn.volumes[v.Id] = v |
|||
} |
|||
} |
|||
|
|||
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { |
|||
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) |
|||
for _, v := range actualVolumes { |
|||
actualVolumeMap[v.Id] = v |
|||
} |
|||
for vid, v := range dn.volumes { |
|||
if _, ok := actualVolumeMap[vid]; !ok { |
|||
glog.V(0).Infoln("Deleting volume id:", vid) |
|||
delete(dn.volumes, vid) |
|||
deletedVolumes = append(deletedVolumes, v) |
|||
dn.UpAdjustVolumeCountDelta(-1) |
|||
dn.UpAdjustActiveVolumeCountDelta(-1) |
|||
} |
|||
} //TODO: adjust max volume id, if need to reclaim volume ids
|
|||
for _, v := range actualVolumes { |
|||
dn.AddOrUpdateVolume(v) |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (dn *DataNode) GetDataCenter() *DataCenter { |
|||
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) |
|||
} |
|||
|
|||
func (dn *DataNode) GetRack() *Rack { |
|||
return dn.Parent().(*NodeImpl).value.(*Rack) |
|||
} |
|||
|
|||
func (dn *DataNode) GetTopology() *Topology { |
|||
p := dn.Parent() |
|||
for p.Parent() != nil { |
|||
p = p.Parent() |
|||
} |
|||
t := p.(*Topology) |
|||
return t |
|||
} |
|||
|
|||
func (dn *DataNode) MatchLocation(ip string, port int) bool { |
|||
return dn.Ip == ip && dn.Port == port |
|||
} |
|||
|
|||
func (dn *DataNode) Url() string { |
|||
return dn.Ip + ":" + strconv.Itoa(dn.Port) |
|||
} |
|||
|
|||
func (dn *DataNode) ToMap() interface{} { |
|||
ret := make(map[string]interface{}) |
|||
ret["Url"] = dn.Url() |
|||
ret["Volumes"] = dn.GetVolumeCount() |
|||
ret["Max"] = dn.GetMaxVolumeCount() |
|||
ret["Free"] = dn.FreeSpace() |
|||
ret["PublicUrl"] = dn.PublicUrl |
|||
return ret |
|||
} |
|||
@ -1,126 +0,0 @@ |
|||
package topology |
|||
|
|||
import ( |
|||
"bytes" |
|||
"net/http" |
|||
"strconv" |
|||
|
|||
"net/url" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/glog" |
|||
"github.com/chrislusf/seaweedfs/go/operation" |
|||
"github.com/chrislusf/seaweedfs/go/security" |
|||
"github.com/chrislusf/seaweedfs/go/storage" |
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
) |
|||
|
|||
func ReplicatedWrite(masterNode string, s *storage.Store, |
|||
volumeId storage.VolumeId, needle *storage.Needle, |
|||
r *http.Request) (size uint32, errorStatus string) { |
|||
|
|||
//check JWT
|
|||
jwt := security.GetJwt(r) |
|||
|
|||
ret, err := s.Write(volumeId, needle) |
|||
needToReplicate := !s.HasVolume(volumeId) |
|||
if err != nil { |
|||
errorStatus = "Failed to write to local disk (" + err.Error() + ")" |
|||
} else if ret > 0 { |
|||
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() |
|||
} else { |
|||
errorStatus = "Failed to write to local disk" |
|||
} |
|||
if !needToReplicate && ret > 0 { |
|||
needToReplicate = s.GetVolume(volumeId).NeedToReplicate() |
|||
} |
|||
if needToReplicate { //send to other replica locations
|
|||
if r.FormValue("type") != "replicate" { |
|||
|
|||
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { |
|||
u := url.URL{ |
|||
Scheme: "http", |
|||
Host: location.Url, |
|||
Path: r.URL.Path, |
|||
} |
|||
q := url.Values{ |
|||
"type": {"replicate"}, |
|||
} |
|||
if needle.LastModified > 0 { |
|||
q.Set("ts", strconv.FormatUint(needle.LastModified, 10)) |
|||
} |
|||
if needle.IsChunkedManifest() { |
|||
q.Set("cm", "true") |
|||
} |
|||
u.RawQuery = q.Encode() |
|||
_, err := operation.Upload(u.String(), |
|||
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), |
|||
jwt) |
|||
return err == nil |
|||
}) { |
|||
ret = 0 |
|||
errorStatus = "Failed to write to replicas for volume " + volumeId.String() |
|||
} |
|||
} |
|||
} |
|||
size = ret |
|||
return |
|||
} |
|||
|
|||
func ReplicatedDelete(masterNode string, store *storage.Store, |
|||
volumeId storage.VolumeId, n *storage.Needle, |
|||
r *http.Request) (ret uint32) { |
|||
|
|||
//check JWT
|
|||
jwt := security.GetJwt(r) |
|||
|
|||
ret, err := store.Delete(volumeId, n) |
|||
if err != nil { |
|||
glog.V(0).Infoln("delete error:", err) |
|||
return |
|||
} |
|||
|
|||
needToReplicate := !store.HasVolume(volumeId) |
|||
if !needToReplicate && ret > 0 { |
|||
needToReplicate = store.GetVolume(volumeId).NeedToReplicate() |
|||
} |
|||
if needToReplicate { //send to other replica locations
|
|||
if r.FormValue("type") != "replicate" { |
|||
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { |
|||
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) |
|||
}) { |
|||
ret = 0 |
|||
} |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { |
|||
if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { |
|||
length := 0 |
|||
selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) |
|||
results := make(chan bool) |
|||
for _, location := range lookupResult.Locations { |
|||
if location.Url != selfUrl { |
|||
length++ |
|||
go func(location operation.Location, results chan bool) { |
|||
results <- op(location) |
|||
}(location, results) |
|||
} |
|||
} |
|||
ret := true |
|||
for i := 0; i < length; i++ { |
|||
ret = ret && <-results |
|||
} |
|||
if volume := store.GetVolume(volumeId); volume != nil { |
|||
if length+1 < volume.ReplicaPlacement.GetCopyCount() { |
|||
glog.V(0).Infof("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount()) |
|||
ret = false |
|||
} |
|||
} |
|||
return ret |
|||
} else { |
|||
glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error()) |
|||
} |
|||
return false |
|||
} |
|||
@ -1,189 +0,0 @@ |
|||
package topology |
|||
|
|||
import ( |
|||
"errors" |
|||
"io/ioutil" |
|||
"math/rand" |
|||
|
|||
"github.com/chrislusf/raft" |
|||
"github.com/chrislusf/seaweedfs/go/glog" |
|||
"github.com/chrislusf/seaweedfs/go/operation" |
|||
"github.com/chrislusf/seaweedfs/go/sequence" |
|||
"github.com/chrislusf/seaweedfs/go/storage" |
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
) |
|||
|
|||
type Topology struct { |
|||
NodeImpl |
|||
|
|||
collectionMap *util.ConcurrentReadMap |
|||
|
|||
pulse int64 |
|||
|
|||
volumeSizeLimit uint64 |
|||
|
|||
Sequence sequence.Sequencer |
|||
|
|||
chanDeadDataNodes chan *DataNode |
|||
chanRecoveredDataNodes chan *DataNode |
|||
chanFullVolumes chan storage.VolumeInfo |
|||
|
|||
configuration *Configuration |
|||
|
|||
RaftServer raft.Server |
|||
} |
|||
|
|||
func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { |
|||
t := &Topology{} |
|||
t.id = NodeId(id) |
|||
t.nodeType = "Topology" |
|||
t.NodeImpl.value = t |
|||
t.children = make(map[NodeId]Node) |
|||
t.collectionMap = util.NewConcurrentReadMap() |
|||
t.pulse = int64(pulse) |
|||
t.volumeSizeLimit = volumeSizeLimit |
|||
|
|||
t.Sequence = seq |
|||
|
|||
t.chanDeadDataNodes = make(chan *DataNode) |
|||
t.chanRecoveredDataNodes = make(chan *DataNode) |
|||
t.chanFullVolumes = make(chan storage.VolumeInfo) |
|||
|
|||
err := t.loadConfiguration(confFile) |
|||
|
|||
return t, err |
|||
} |
|||
|
|||
func (t *Topology) IsLeader() bool { |
|||
if leader, e := t.Leader(); e == nil { |
|||
return leader == t.RaftServer.Name() |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func (t *Topology) Leader() (string, error) { |
|||
l := "" |
|||
if t.RaftServer != nil { |
|||
l = t.RaftServer.Leader() |
|||
} else { |
|||
return "", errors.New("Raft Server not ready yet!") |
|||
} |
|||
|
|||
if l == "" { |
|||
// We are a single node cluster, we are the leader
|
|||
return t.RaftServer.Name(), errors.New("Raft Server not initialized!") |
|||
} |
|||
|
|||
return l, nil |
|||
} |
|||
|
|||
func (t *Topology) loadConfiguration(configurationFile string) error { |
|||
b, e := ioutil.ReadFile(configurationFile) |
|||
if e == nil { |
|||
t.configuration, e = NewConfiguration(b) |
|||
return e |
|||
} |
|||
glog.V(0).Infoln("Using default configurations.") |
|||
return nil |
|||
} |
|||
|
|||
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { |
|||
//maybe an issue if lots of collections?
|
|||
if collection == "" { |
|||
for _, c := range t.collectionMap.Items { |
|||
if list := c.(*Collection).Lookup(vid); list != nil { |
|||
return list |
|||
} |
|||
} |
|||
} else { |
|||
if c, ok := t.collectionMap.Items[collection]; ok { |
|||
return c.(*Collection).Lookup(vid) |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (t *Topology) NextVolumeId() storage.VolumeId { |
|||
vid := t.GetMaxVolumeId() |
|||
next := vid.Next() |
|||
go t.RaftServer.Do(NewMaxVolumeIdCommand(next)) |
|||
return next |
|||
} |
|||
|
|||
func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { |
|||
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) |
|||
return vl.GetActiveVolumeCount(option) > 0 |
|||
} |
|||
|
|||
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { |
|||
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) |
|||
if err != nil || datanodes.Length() == 0 { |
|||
return "", 0, nil, errors.New("No writable volumes available!") |
|||
} |
|||
fileId, count := t.Sequence.NextFileId(count) |
|||
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil |
|||
} |
|||
|
|||
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { |
|||
return t.collectionMap.Get(collectionName, func() interface{} { |
|||
return NewCollection(collectionName, t.volumeSizeLimit) |
|||
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl) |
|||
} |
|||
|
|||
func (t *Topology) GetCollection(collectionName string) (*Collection, bool) { |
|||
c, hasCollection := t.collectionMap.Items[collectionName] |
|||
return c.(*Collection), hasCollection |
|||
} |
|||
|
|||
func (t *Topology) DeleteCollection(collectionName string) { |
|||
delete(t.collectionMap.Items, collectionName) |
|||
} |
|||
|
|||
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { |
|||
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) |
|||
} |
|||
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { |
|||
glog.Infof("removing volume info:%+v", v) |
|||
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) |
|||
} |
|||
|
|||
func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { |
|||
t.Sequence.SetMax(*joinMessage.MaxFileKey) |
|||
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) |
|||
dc := t.GetOrCreateDataCenter(dcName) |
|||
rack := dc.GetOrCreateRack(rackName) |
|||
dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port)) |
|||
if *joinMessage.IsInit && dn != nil { |
|||
t.UnRegisterDataNode(dn) |
|||
} |
|||
dn = rack.GetOrCreateDataNode(*joinMessage.Ip, |
|||
int(*joinMessage.Port), *joinMessage.PublicUrl, |
|||
int(*joinMessage.MaxVolumeCount)) |
|||
var volumeInfos []storage.VolumeInfo |
|||
for _, v := range joinMessage.Volumes { |
|||
if vi, err := storage.NewVolumeInfo(v); err == nil { |
|||
volumeInfos = append(volumeInfos, vi) |
|||
} else { |
|||
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) |
|||
} |
|||
} |
|||
deletedVolumes := dn.UpdateVolumes(volumeInfos) |
|||
for _, v := range volumeInfos { |
|||
t.RegisterVolumeLayout(v, dn) |
|||
} |
|||
for _, v := range deletedVolumes { |
|||
t.UnRegisterVolumeLayout(v, dn) |
|||
} |
|||
} |
|||
|
|||
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { |
|||
for _, c := range t.Children() { |
|||
dc := c.(*DataCenter) |
|||
if string(dc.Id()) == dcName { |
|||
return dc |
|||
} |
|||
} |
|||
dc := NewDataCenter(dcName) |
|||
t.LinkChildNode(dc) |
|||
return dc |
|||
} |
|||
@ -1,211 +0,0 @@ |
|||
package topology |
|||
|
|||
import ( |
|||
"fmt" |
|||
"math/rand" |
|||
"sync" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/glog" |
|||
"github.com/chrislusf/seaweedfs/go/storage" |
|||
) |
|||
|
|||
/* |
|||
This package is created to resolve these replica placement issues: |
|||
1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies |
|||
2. in time of tight storage, how to reduce replica level |
|||
3. optimizing for hot data on faster disk, cold data on cheaper storage, |
|||
4. volume allocation for each bucket |
|||
*/ |
|||
|
|||
type VolumeGrowOption struct { |
|||
Collection string |
|||
ReplicaPlacement *storage.ReplicaPlacement |
|||
Ttl *storage.TTL |
|||
DataCenter string |
|||
Rack string |
|||
DataNode string |
|||
} |
|||
|
|||
type VolumeGrowth struct { |
|||
accessLock sync.Mutex |
|||
} |
|||
|
|||
func (o *VolumeGrowOption) String() string { |
|||
return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode) |
|||
} |
|||
|
|||
func NewDefaultVolumeGrowth() *VolumeGrowth { |
|||
return &VolumeGrowth{} |
|||
} |
|||
|
|||
// one replication type may need rp.GetCopyCount() actual volumes
|
|||
// given copyCount, how many logical volumes to create
|
|||
func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { |
|||
switch copyCount { |
|||
case 1: |
|||
count = 7 |
|||
case 2: |
|||
count = 6 |
|||
case 3: |
|||
count = 3 |
|||
default: |
|||
count = 1 |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) { |
|||
count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo) |
|||
if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { |
|||
return count, nil |
|||
} |
|||
return count, err |
|||
} |
|||
func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { |
|||
vg.accessLock.Lock() |
|||
defer vg.accessLock.Unlock() |
|||
|
|||
for i := 0; i < targetCount; i++ { |
|||
if c, e := vg.findAndGrow(topo, option); e == nil { |
|||
counter += c |
|||
} else { |
|||
return counter, e |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { |
|||
servers, e := vg.findEmptySlotsForOneVolume(topo, option) |
|||
if e != nil { |
|||
return 0, e |
|||
} |
|||
vid := topo.NextVolumeId() |
|||
err := vg.grow(topo, vid, option, servers...) |
|||
return len(servers), err |
|||
} |
|||
|
|||
// 1. find the main data node
|
|||
// 1.1 collect all data nodes that have 1 slots
|
|||
// 2.2 collect all racks that have rp.SameRackCount+1
|
|||
// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
|
|||
// 2. find rest data nodes
|
|||
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { |
|||
//find main datacenter and other data centers
|
|||
rp := option.ReplicaPlacement |
|||
mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { |
|||
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { |
|||
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) |
|||
} |
|||
if len(node.Children()) < rp.DiffRackCount+1 { |
|||
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) |
|||
} |
|||
if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { |
|||
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) |
|||
} |
|||
possibleRacksCount := 0 |
|||
for _, rack := range node.Children() { |
|||
possibleDataNodesCount := 0 |
|||
for _, n := range rack.Children() { |
|||
if n.FreeSpace() >= 1 { |
|||
possibleDataNodesCount++ |
|||
} |
|||
} |
|||
if possibleDataNodesCount >= rp.SameRackCount+1 { |
|||
possibleRacksCount++ |
|||
} |
|||
} |
|||
if possibleRacksCount < rp.DiffRackCount+1 { |
|||
return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1) |
|||
} |
|||
return nil |
|||
}) |
|||
if dc_err != nil { |
|||
return nil, dc_err |
|||
} |
|||
|
|||
//find main rack and other racks
|
|||
mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { |
|||
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { |
|||
return fmt.Errorf("Not matching preferred rack:%s", option.Rack) |
|||
} |
|||
if node.FreeSpace() < rp.SameRackCount+1 { |
|||
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) |
|||
} |
|||
if len(node.Children()) < rp.SameRackCount+1 { |
|||
// a bit faster way to test free racks
|
|||
return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1) |
|||
} |
|||
possibleDataNodesCount := 0 |
|||
for _, n := range node.Children() { |
|||
if n.FreeSpace() >= 1 { |
|||
possibleDataNodesCount++ |
|||
} |
|||
} |
|||
if possibleDataNodesCount < rp.SameRackCount+1 { |
|||
return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1) |
|||
} |
|||
return nil |
|||
}) |
|||
if rack_err != nil { |
|||
return nil, rack_err |
|||
} |
|||
|
|||
//find main rack and other racks
|
|||
mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { |
|||
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { |
|||
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) |
|||
} |
|||
if node.FreeSpace() < 1 { |
|||
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) |
|||
} |
|||
return nil |
|||
}) |
|||
if server_err != nil { |
|||
return nil, server_err |
|||
} |
|||
|
|||
servers = append(servers, mainServer.(*DataNode)) |
|||
for _, server := range otherServers { |
|||
servers = append(servers, server.(*DataNode)) |
|||
} |
|||
for _, rack := range otherRacks { |
|||
r := rand.Intn(rack.FreeSpace()) |
|||
if server, e := rack.ReserveOneVolume(r); e == nil { |
|||
servers = append(servers, server) |
|||
} else { |
|||
return servers, e |
|||
} |
|||
} |
|||
for _, datacenter := range otherDataCenters { |
|||
r := rand.Intn(datacenter.FreeSpace()) |
|||
if server, e := datacenter.ReserveOneVolume(r); e == nil { |
|||
servers = append(servers, server) |
|||
} else { |
|||
return servers, e |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { |
|||
for _, server := range servers { |
|||
if err := AllocateVolume(server, vid, option); err == nil { |
|||
vi := storage.VolumeInfo{ |
|||
Id: vid, |
|||
Size: 0, |
|||
Collection: option.Collection, |
|||
ReplicaPlacement: option.ReplicaPlacement, |
|||
Ttl: option.Ttl, |
|||
Version: storage.CurrentVersion, |
|||
} |
|||
server.AddOrUpdateVolume(vi) |
|||
topo.RegisterVolumeLayout(vi, server) |
|||
glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String()) |
|||
} else { |
|||
glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err) |
|||
return fmt.Errorf("Failed to assign %d: %v", vid, err) |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
@ -1,65 +0,0 @@ |
|||
package topology |
|||
|
|||
import ( |
|||
"fmt" |
|||
) |
|||
|
|||
type VolumeLocationList struct { |
|||
list []*DataNode |
|||
} |
|||
|
|||
func NewVolumeLocationList() *VolumeLocationList { |
|||
return &VolumeLocationList{} |
|||
} |
|||
|
|||
func (dnll *VolumeLocationList) String() string { |
|||
return fmt.Sprintf("%v", dnll.list) |
|||
} |
|||
|
|||
func (dnll *VolumeLocationList) Head() *DataNode { |
|||
//mark first node as master volume
|
|||
return dnll.list[0] |
|||
} |
|||
|
|||
func (dnll *VolumeLocationList) Length() int { |
|||
return len(dnll.list) |
|||
} |
|||
|
|||
func (dnll *VolumeLocationList) Set(loc *DataNode) { |
|||
for i := 0; i < len(dnll.list); i++ { |
|||
if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port { |
|||
dnll.list[i] = loc |
|||
return |
|||
} |
|||
} |
|||
dnll.list = append(dnll.list, loc) |
|||
} |
|||
|
|||
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { |
|||
for i, dnl := range dnll.list { |
|||
if loc.Ip == dnl.Ip && loc.Port == dnl.Port { |
|||
dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) |
|||
return true |
|||
} |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { |
|||
var changed bool |
|||
for _, dnl := range dnll.list { |
|||
if dnl.LastSeen < freshThreshHold { |
|||
changed = true |
|||
break |
|||
} |
|||
} |
|||
if changed { |
|||
var l []*DataNode |
|||
for _, dnl := range dnll.list { |
|||
if dnl.LastSeen >= freshThreshHold { |
|||
l = append(l, dnl) |
|||
} |
|||
} |
|||
dnll.list = l |
|||
} |
|||
} |
|||
@ -1,38 +0,0 @@ |
|||
package util |
|||
|
|||
import ( |
|||
"sync" |
|||
) |
|||
|
|||
// A mostly for read map, which can thread-safely
|
|||
// initialize the map entries.
|
|||
type ConcurrentReadMap struct { |
|||
rmutex sync.RWMutex |
|||
mutex sync.Mutex |
|||
Items map[string]interface{} |
|||
} |
|||
|
|||
func NewConcurrentReadMap() *ConcurrentReadMap { |
|||
return &ConcurrentReadMap{Items: make(map[string]interface{})} |
|||
} |
|||
|
|||
func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { |
|||
m.mutex.Lock() |
|||
defer m.mutex.Unlock() |
|||
if value, ok := m.Items[key]; ok { |
|||
return value |
|||
} |
|||
value = newEntry() |
|||
m.Items[key] = value |
|||
return value |
|||
} |
|||
|
|||
func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} { |
|||
m.rmutex.RLock() |
|||
if value, ok := m.Items[key]; ok { |
|||
m.rmutex.RUnlock() |
|||
return value |
|||
} |
|||
m.rmutex.RUnlock() |
|||
return m.initMapEntry(key, newEntry) |
|||
} |
|||
@ -1,163 +0,0 @@ |
|||
package util |
|||
|
|||
import ( |
|||
"bytes" |
|||
"encoding/json" |
|||
"errors" |
|||
"fmt" |
|||
"io" |
|||
"io/ioutil" |
|||
"net/http" |
|||
"net/url" |
|||
"strings" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/security" |
|||
) |
|||
|
|||
var ( |
|||
client *http.Client |
|||
Transport *http.Transport |
|||
) |
|||
|
|||
func init() { |
|||
Transport = &http.Transport{ |
|||
MaxIdleConnsPerHost: 1024, |
|||
} |
|||
client = &http.Client{Transport: Transport} |
|||
} |
|||
|
|||
func PostBytes(url string, body []byte) ([]byte, error) { |
|||
r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("Post to %s: %v", url, err) |
|||
} |
|||
defer r.Body.Close() |
|||
b, err := ioutil.ReadAll(r.Body) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("Read response body: %v", err) |
|||
} |
|||
return b, nil |
|||
} |
|||
|
|||
func Post(url string, values url.Values) ([]byte, error) { |
|||
r, err := client.PostForm(url, values) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
defer r.Body.Close() |
|||
b, err := ioutil.ReadAll(r.Body) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return b, nil |
|||
} |
|||
|
|||
func Get(url string) ([]byte, error) { |
|||
r, err := client.Get(url) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
defer r.Body.Close() |
|||
b, err := ioutil.ReadAll(r.Body) |
|||
if r.StatusCode != 200 { |
|||
return nil, fmt.Errorf("%s: %s", url, r.Status) |
|||
} |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return b, nil |
|||
} |
|||
|
|||
func Delete(url string, jwt security.EncodedJwt) error { |
|||
req, err := http.NewRequest("DELETE", url, nil) |
|||
if jwt != "" { |
|||
req.Header.Set("Authorization", "BEARER "+string(jwt)) |
|||
} |
|||
if err != nil { |
|||
return err |
|||
} |
|||
resp, e := client.Do(req) |
|||
if e != nil { |
|||
return e |
|||
} |
|||
defer resp.Body.Close() |
|||
body, err := ioutil.ReadAll(resp.Body) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
switch resp.StatusCode { |
|||
case http.StatusNotFound, http.StatusAccepted, http.StatusOK: |
|||
return nil |
|||
} |
|||
m := make(map[string]interface{}) |
|||
if e := json.Unmarshal(body, m); e == nil { |
|||
if s, ok := m["error"].(string); ok { |
|||
return errors.New(s) |
|||
} |
|||
} |
|||
return errors.New(string(body)) |
|||
} |
|||
|
|||
func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error { |
|||
r, err := client.PostForm(url, values) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer r.Body.Close() |
|||
if r.StatusCode != 200 { |
|||
return fmt.Errorf("%s: %s", url, r.Status) |
|||
} |
|||
bufferSize := len(allocatedBytes) |
|||
for { |
|||
n, err := r.Body.Read(allocatedBytes) |
|||
if n == bufferSize { |
|||
eachBuffer(allocatedBytes) |
|||
} |
|||
if err != nil { |
|||
if err == io.EOF { |
|||
return nil |
|||
} |
|||
return err |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error { |
|||
r, err := client.PostForm(url, values) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer r.Body.Close() |
|||
if r.StatusCode != 200 { |
|||
return fmt.Errorf("%s: %s", url, r.Status) |
|||
} |
|||
return readFn(r.Body) |
|||
} |
|||
|
|||
func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) { |
|||
response, err := client.Get(fileUrl) |
|||
if err != nil { |
|||
return "", nil, err |
|||
} |
|||
contentDisposition := response.Header["Content-Disposition"] |
|||
if len(contentDisposition) > 0 { |
|||
if strings.HasPrefix(contentDisposition[0], "filename=") { |
|||
filename = contentDisposition[0][len("filename="):] |
|||
filename = strings.Trim(filename, "\"") |
|||
} |
|||
} |
|||
rc = response.Body |
|||
return |
|||
} |
|||
|
|||
func Do(req *http.Request) (resp *http.Response, err error) { |
|||
return client.Do(req) |
|||
} |
|||
|
|||
func NormalizeUrl(url string) string { |
|||
if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") { |
|||
return url |
|||
} |
|||
return "http://" + url |
|||
} |
|||
@ -1,30 +0,0 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"net/http" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/stats" |
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
ui "github.com/chrislusf/seaweedfs/go/weed/weed_server/master_ui" |
|||
) |
|||
|
|||
func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { |
|||
infos := make(map[string]interface{}) |
|||
infos["Version"] = util.VERSION |
|||
args := struct { |
|||
Version string |
|||
Topology interface{} |
|||
Leader string |
|||
Peers interface{} |
|||
Stats map[string]interface{} |
|||
Counters *stats.ServerStats |
|||
}{ |
|||
util.VERSION, |
|||
ms.Topo.ToMap(), |
|||
ms.Topo.RaftServer.Leader(), |
|||
ms.Topo.RaftServer.Peers(), |
|||
infos, |
|||
serverStats, |
|||
} |
|||
ui.StatusTpl.Execute(w, args) |
|||
} |
|||
@ -1,50 +0,0 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"net/http" |
|||
"path/filepath" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/glog" |
|||
"github.com/chrislusf/seaweedfs/go/stats" |
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
) |
|||
|
|||
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { |
|||
m := make(map[string]interface{}) |
|||
m["Version"] = util.VERSION |
|||
m["Volumes"] = vs.store.Status() |
|||
writeJsonQuiet(w, r, http.StatusOK, m) |
|||
} |
|||
|
|||
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { |
|||
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl")) |
|||
if err == nil { |
|||
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) |
|||
} else { |
|||
writeJsonError(w, r, http.StatusNotAcceptable, err) |
|||
} |
|||
glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err) |
|||
} |
|||
|
|||
func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) { |
|||
err := vs.store.DeleteCollection(r.FormValue("collection")) |
|||
if err == nil { |
|||
writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) |
|||
} else { |
|||
writeJsonError(w, r, http.StatusInternalServerError, err) |
|||
} |
|||
glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err) |
|||
} |
|||
|
|||
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { |
|||
m := make(map[string]interface{}) |
|||
m["Version"] = util.VERSION |
|||
var ds []*stats.DiskStatus |
|||
for _, loc := range vs.store.Locations { |
|||
if dir, e := filepath.Abs(loc.Directory); e == nil { |
|||
ds = append(ds, stats.NewDiskStatus(dir)) |
|||
} |
|||
} |
|||
m["DiskStatuses"] = ds |
|||
writeJsonQuiet(w, r, http.StatusOK, m) |
|||
} |
|||
@ -1,86 +0,0 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"fmt" |
|||
"net/http" |
|||
|
|||
"github.com/chrislusf/seaweedfs/go/glog" |
|||
"github.com/chrislusf/seaweedfs/go/storage" |
|||
"github.com/chrislusf/seaweedfs/go/util" |
|||
) |
|||
|
|||
func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) { |
|||
v, err := vs.getVolume("volume", r) |
|||
if v == nil { |
|||
writeJsonError(w, r, http.StatusBadRequest, err) |
|||
return |
|||
} |
|||
syncStat := v.GetVolumeSyncStatus() |
|||
if syncStat.Error != "" { |
|||
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Get Volume %d status error: %s", v.Id, syncStat.Error)) |
|||
glog.V(2).Infoln("getVolumeSyncStatusHandler volume =", r.FormValue("volume"), ", error =", err) |
|||
} else { |
|||
writeJsonQuiet(w, r, http.StatusOK, syncStat) |
|||
} |
|||
} |
|||
|
|||
func (vs *VolumeServer) getVolumeIndexContentHandler(w http.ResponseWriter, r *http.Request) { |
|||
v, err := vs.getVolume("volume", r) |
|||
if v == nil { |
|||
writeJsonError(w, r, http.StatusBadRequest, err) |
|||
return |
|||
} |
|||
content, err := v.IndexFileContent() |
|||
if err != nil { |
|||
writeJsonError(w, r, http.StatusInternalServerError, err) |
|||
return |
|||
} |
|||
w.Write(content) |
|||
} |
|||
|
|||
func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *http.Request) { |
|||
v, err := vs.getVolume("volume", r) |
|||
if v == nil { |
|||
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("Not Found volume: %v", err)) |
|||
return |
|||
} |
|||
if int(v.SuperBlock.CompactRevision) != util.ParseInt(r.FormValue("revision"), 0) { |
|||
writeJsonError(w, r, http.StatusExpectationFailed, fmt.Errorf("Requested Volume Revision is %s, but current revision is %d", r.FormValue("revision"), v.SuperBlock.CompactRevision)) |
|||
return |
|||
} |
|||
offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) |
|||
size := uint32(util.ParseUint64(r.FormValue("size"), 0)) |
|||
content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) |
|||
if err != nil { |
|||
writeJsonError(w, r, http.StatusInternalServerError, err) |
|||
return |
|||
} |
|||
|
|||
id := util.ParseUint64(r.FormValue("id"), 0) |
|||
n := new(storage.Needle) |
|||
n.ParseNeedleHeader(content) |
|||
if id != n.Id { |
|||
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)) |
|||
return |
|||
} |
|||
|
|||
w.Write(content) |
|||
} |
|||
|
|||
func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { |
|||
volumeIdString := r.FormValue(volumeParameterName) |
|||
if volumeIdString == "" { |
|||
err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName) |
|||
return nil, err |
|||
} |
|||
vid, err := storage.NewVolumeId(volumeIdString) |
|||
if err != nil { |
|||
err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) |
|||
return nil, err |
|||
} |
|||
v := vs.store.GetVolume(vid) |
|||
if v == nil { |
|||
return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid) |
|||
} |
|||
return v, nil |
|||
} |
|||
@ -0,0 +1,190 @@ |
|||
{ |
|||
"ImportPath": "github.com/chrislusf/seaweedfs/weed", |
|||
"GoVersion": "go1.6", |
|||
"GodepVersion": "v60", |
|||
"Deps": [ |
|||
{ |
|||
"ImportPath": "bazil.org/fuse", |
|||
"Rev": "e1ba3783de6cb92c16f72802a4bb94fe639a8049" |
|||
}, |
|||
{ |
|||
"ImportPath": "bazil.org/fuse/fs", |
|||
"Rev": "e1ba3783de6cb92c16f72802a4bb94fe639a8049" |
|||
}, |
|||
{ |
|||
"ImportPath": "bazil.org/fuse/fuseutil", |
|||
"Rev": "e1ba3783de6cb92c16f72802a4bb94fe639a8049" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/boltdb/bolt", |
|||
"Comment": "v1.1.0-22-g6e1ca38", |
|||
"Rev": "6e1ca38c6a73025366cd8705553b404746ee6e63" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/chrislusf/raft", |
|||
"Rev": "90f631ee823c83f594f27257bab64911190856af" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/chrislusf/raft/protobuf", |
|||
"Rev": "90f631ee823c83f594f27257bab64911190856af" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/dgrijalva/jwt-go", |
|||
"Comment": "v2.4.0", |
|||
"Rev": "f164e17f59b82642a3895ba065c385db6c547344" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/disintegration/imaging", |
|||
"Rev": "546cb3c5137b3f1232e123a26aa033aade6b3066" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/gocql/gocql", |
|||
"Comment": "1st_gen_framing-383-g87cc185", |
|||
"Rev": "87cc1854b57c7a4d8f4ae1d0cc358ed6ecb0f8c3" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/gocql/gocql/internal/lru", |
|||
"Comment": "1st_gen_framing-383-g87cc185", |
|||
"Rev": "87cc1854b57c7a4d8f4ae1d0cc358ed6ecb0f8c3" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/gocql/gocql/internal/murmur", |
|||
"Comment": "1st_gen_framing-383-g87cc185", |
|||
"Rev": "87cc1854b57c7a4d8f4ae1d0cc358ed6ecb0f8c3" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/gocql/gocql/internal/streams", |
|||
"Comment": "1st_gen_framing-383-g87cc185", |
|||
"Rev": "87cc1854b57c7a4d8f4ae1d0cc358ed6ecb0f8c3" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/gogo/protobuf/proto", |
|||
"Comment": "v0.1-78-gd3235f0", |
|||
"Rev": "d3235f01ecae4901dd9f7ea6af57a352c0189deb" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/golang/protobuf/proto", |
|||
"Rev": "8d92cf5fc15a4382f8964b08e1f42a75c0591aa3" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/golang/snappy", |
|||
"Rev": "723cc1e459b8eea2dea4583200fd60757d40097a" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/gorilla/context", |
|||
"Rev": "1ea25387ff6f684839d82767c1733ff4d4d15d0a" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/gorilla/mux", |
|||
"Comment": "v1.1", |
|||
"Rev": "0eeaf8392f5b04950925b8a69fe70f110fa7cbfc" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/hailocab/go-hostpool", |
|||
"Rev": "0637eae892be221164aff5fcbccc57171aea6406" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/klauspost/crc32", |
|||
"Comment": "v1.0", |
|||
"Rev": "19b0b332c9e4516a6370a0456e6182c3b5036720" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/pierrec/lz4", |
|||
"Rev": "0b67ae4bb1ab03691079e38dddbc3909d68de64f" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/pierrec/xxHash/xxHash32", |
|||
"Rev": "122b94d6aa20d9b33d3989da6d7cd6cf641d2277" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/rwcarlsen/goexif/exif", |
|||
"Rev": "709fab3d192d7c62f86043caff1e7e3fb0f42bd8" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/rwcarlsen/goexif/tiff", |
|||
"Rev": "709fab3d192d7c62f86043caff1e7e3fb0f42bd8" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/cache", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/comparer", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/errors", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/filter", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/iterator", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/journal", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/memdb", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/opt", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/storage", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/table", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "github.com/syndtr/goleveldb/leveldb/util", |
|||
"Rev": "1a9d62f03ea92815b46fcaab357cfd4df264b1a0" |
|||
}, |
|||
{ |
|||
"ImportPath": "golang.org/x/image/bmp", |
|||
"Rev": "baddd3465a05d84a6d8d3507547a91cb188c81ea" |
|||
}, |
|||
{ |
|||
"ImportPath": "golang.org/x/image/tiff", |
|||
"Rev": "baddd3465a05d84a6d8d3507547a91cb188c81ea" |
|||
}, |
|||
{ |
|||
"ImportPath": "golang.org/x/image/tiff/lzw", |
|||
"Rev": "baddd3465a05d84a6d8d3507547a91cb188c81ea" |
|||
}, |
|||
{ |
|||
"ImportPath": "golang.org/x/net/context", |
|||
"Rev": "c764672d0ee39ffd83cfcb375804d3181302b62b" |
|||
}, |
|||
{ |
|||
"ImportPath": "golang.org/x/sys/unix", |
|||
"Rev": "320cb01ddbbf0473674c2585f9b6e245721de355" |
|||
}, |
|||
{ |
|||
"ImportPath": "gopkg.in/bufio.v1", |
|||
"Comment": "v1", |
|||
"Rev": "567b2bfa514e796916c4747494d6ff5132a1dfce" |
|||
}, |
|||
{ |
|||
"ImportPath": "gopkg.in/inf.v0", |
|||
"Rev": "3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4" |
|||
}, |
|||
{ |
|||
"ImportPath": "gopkg.in/redis.v2", |
|||
"Comment": "v2.3.2", |
|||
"Rev": "e6179049628164864e6e84e973cfb56335748dea" |
|||
} |
|||
] |
|||
} |
|||
@ -0,0 +1,5 @@ |
|||
This directory tree is generated automatically by godep. |
|||
|
|||
Please do not edit. |
|||
|
|||
See https://github.com/tools/godep for more information. |
|||
@ -1,7 +1,7 @@ |
|||
package embedded_filer |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/go/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
) |
|||
|
|||
type DirectoryManager interface { |
|||
@ -1,7 +1,5 @@ |
|||
package flat_namespace |
|||
|
|||
import () |
|||
|
|||
type FlatNamespaceStore interface { |
|||
Put(fullFileName string, fid string) (err error) |
|||
Get(fullFileName string) (fid string, err error) |
|||
@ -0,0 +1,29 @@ |
|||
package images |
|||
|
|||
import ( |
|||
"io/ioutil" |
|||
"testing" |
|||
) |
|||
|
|||
func TestResizing(t *testing.T) { |
|||
fname := "sample1.png" |
|||
|
|||
data, _ := ioutil.ReadFile(fname) |
|||
|
|||
new_data, _, _ := Resized(".jpg", data, 500, 0) |
|||
|
|||
ioutil.WriteFile("resized.jpg", new_data, 0644) |
|||
} |
|||
|
|||
func BenchmarkResizing(b *testing.B) { |
|||
fName := "test1.png" |
|||
data, _ := ioutil.ReadFile(fName) |
|||
ext := ".jpg" |
|||
w := 0 |
|||
h := 355 |
|||
b.ResetTimer() |
|||
for i := 0; i < b.N; i++ { |
|||
Resized(ext, data, w, h) |
|||
b.SetBytes(int64(len(data))) |
|||
} |
|||
} |
|||
|
Before Width: 2592 | Height: 1936 | Size: 2.0 MiB After Width: 2592 | Height: 1936 | Size: 2.0 MiB |
@ -0,0 +1,7 @@ |
|||
package main |
|||
|
|||
import "github.com/chrislusf/seaweedfs/weed/weedcmd" |
|||
|
|||
func main() { |
|||
weedcmd.Main() |
|||
} |
|||
@ -0,0 +1,61 @@ |
|||
package operation |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"errors" |
|||
"sort" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
) |
|||
|
|||
type ChunkInfo struct { |
|||
Fid string `json:"fid"` |
|||
Offset int64 `json:"offset"` |
|||
Size int64 `json:"size"` |
|||
} |
|||
|
|||
type ChunkList []*ChunkInfo |
|||
|
|||
type ChunkManifest struct { |
|||
Name string `json:"name,omitempty"` |
|||
Mime string `json:"mime,omitempty"` |
|||
Size int64 `json:"size,omitempty"` |
|||
Chunks ChunkList `json:"chunks,omitempty"` |
|||
} |
|||
|
|||
func (s ChunkList) Len() int { return len(s) } |
|||
func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset } |
|||
func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
|||
|
|||
func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { |
|||
if isGzipped { |
|||
var err error |
|||
if buffer, err = UnGzipData(buffer); err != nil { |
|||
return nil, err |
|||
} |
|||
} |
|||
cm := ChunkManifest{} |
|||
if e := json.Unmarshal(buffer, &cm); e != nil { |
|||
return nil, e |
|||
} |
|||
sort.Sort(cm.Chunks) |
|||
return &cm, nil |
|||
} |
|||
|
|||
func (cm *ChunkManifest) Marshal() ([]byte, error) { |
|||
return json.Marshal(cm) |
|||
} |
|||
|
|||
func (cm *ChunkManifest) DeleteChunks(master, collection string) error { |
|||
deleteError := 0 |
|||
for _, ci := range cm.Chunks { |
|||
if e := DeleteFile(master, ci.Fid, collection, ""); e != nil { |
|||
deleteError++ |
|||
glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master) |
|||
} |
|||
} |
|||
if deleteError > 0 { |
|||
return errors.New("Not all chunks deleted.") |
|||
} |
|||
return nil |
|||
} |
|||
@ -0,0 +1,65 @@ |
|||
package stats |
|||
|
|||
import ( |
|||
"time" |
|||
) |
|||
|
|||
type ServerStats struct { |
|||
Requests *DurationCounter |
|||
Connections *DurationCounter |
|||
AssignRequests *DurationCounter |
|||
ReadRequests *DurationCounter |
|||
WriteRequests *DurationCounter |
|||
DeleteRequests *DurationCounter |
|||
BytesIn *DurationCounter |
|||
BytesOut *DurationCounter |
|||
} |
|||
|
|||
var ( |
|||
ServStats = NewServerStats() |
|||
) |
|||
|
|||
func NewServerStats() *ServerStats { |
|||
return &ServerStats{ |
|||
Requests: NewDurationCounter(), |
|||
Connections: NewDurationCounter(), |
|||
AssignRequests: NewDurationCounter(), |
|||
ReadRequests: NewDurationCounter(), |
|||
WriteRequests: NewDurationCounter(), |
|||
DeleteRequests: NewDurationCounter(), |
|||
BytesIn: NewDurationCounter(), |
|||
BytesOut: NewDurationCounter(), |
|||
} |
|||
} |
|||
|
|||
func ConnectionOpen() { |
|||
ServStats.Connections.Add(NewTimedValue(time.Now(), 1)) |
|||
} |
|||
func ConnectionClose() { |
|||
ServStats.Connections.Add(NewTimedValue(time.Now(), -1)) |
|||
} |
|||
func RequestOpen() { |
|||
ServStats.Requests.Add(NewTimedValue(time.Now(), 1)) |
|||
} |
|||
func RequestClose() { |
|||
ServStats.Requests.Add(NewTimedValue(time.Now(), 1)) |
|||
|
|||
} |
|||
func AssignRequest() { |
|||
ServStats.AssignRequests.Add(NewTimedValue(time.Now(), 1)) |
|||
} |
|||
func ReadRequest() { |
|||
ServStats.ReadRequests.Add(NewTimedValue(time.Now(), 1)) |
|||
} |
|||
func WriteRequest() { |
|||
ServStats.WriteRequests.Add(NewTimedValue(time.Now(), 1)) |
|||
} |
|||
func DeleteRequest() { |
|||
ServStats.DeleteRequests.Add(NewTimedValue(time.Now(), 1)) |
|||
} |
|||
func BytesIn(val int64) { |
|||
ServStats.BytesIn.Add(NewTimedValue(time.Now(), 1)) |
|||
} |
|||
func BytesOut(val int64) { |
|||
ServStats.BytesOut.Add(NewTimedValue(time.Now(), 1)) |
|||
} |
|||
@ -0,0 +1,117 @@ |
|||
package storage |
|||
|
|||
import "github.com/chrislusf/seaweedfs/weed/weedpb" |
|||
|
|||
type SettingKey int |
|||
|
|||
const ( |
|||
keyReplicatePlacement SettingKey = iota |
|||
keyGarbageThreshold |
|||
) |
|||
|
|||
type CollectionSettings struct { |
|||
settings map[string]map[SettingKey]interface{} |
|||
} |
|||
|
|||
func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold string) *CollectionSettings { |
|||
rp, e := NewReplicaPlacementFromString(defaultReplicatePlacement) |
|||
if e != nil { |
|||
rp, _ = NewReplicaPlacementFromString("000") |
|||
} |
|||
c := &CollectionSettings{ |
|||
settings: make(map[string]map[SettingKey]interface{}), |
|||
} |
|||
c.set("", keyReplicatePlacement, rp) |
|||
c.set("", keyGarbageThreshold, defaultGarbageThreshold) |
|||
return c |
|||
} |
|||
|
|||
func NewCollectionSettingsFromPbMessage(msg []*weedpb.CollectionSetting) *CollectionSettings { |
|||
c := &CollectionSettings{ |
|||
settings: make(map[string]map[SettingKey]interface{}), |
|||
} |
|||
for _, m := range msg { |
|||
c.SetGarbageThreshold(m.Collection, m.VacuumGarbageThreshold) |
|||
c.SetReplicaPlacement(m.Collection, m.ReplicaPlacement) |
|||
} |
|||
return c |
|||
} |
|||
|
|||
func (cs *CollectionSettings) ToPbMessage() []*weedpb.CollectionSetting { |
|||
msg := make([]*weedpb.CollectionSetting, 0, len(cs.settings)) |
|||
for collection, m := range cs.settings { |
|||
setting := &weedpb.CollectionSetting{ |
|||
Collection: collection, |
|||
} |
|||
if v, ok := m[keyReplicatePlacement]; ok && v != nil { |
|||
setting.ReplicaPlacement = v.(*ReplicaPlacement).String() |
|||
} |
|||
if v, ok := m[keyGarbageThreshold]; ok && v != nil { |
|||
setting.VacuumGarbageThreshold = v.(string) |
|||
} |
|||
msg = append(msg, setting) |
|||
} |
|||
return msg |
|||
} |
|||
|
|||
func (cs *CollectionSettings) get(collection string, key SettingKey) interface{} { |
|||
if m, ok := cs.settings[collection]; ok { |
|||
if v, ok := m[key]; ok { |
|||
return v |
|||
} |
|||
} |
|||
if m, ok := cs.settings[""]; ok { |
|||
return m[key] |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (cs *CollectionSettings) set(collection string, key SettingKey, value interface{}) { |
|||
m := cs.settings[collection] |
|||
if m == nil { |
|||
m = make(map[SettingKey]interface{}) |
|||
cs.settings[collection] = m |
|||
} |
|||
if value == nil { |
|||
//mustn't delete default setting
|
|||
if collection != "" { |
|||
delete(m, key) |
|||
} |
|||
} else { |
|||
m[key] = value |
|||
} |
|||
} |
|||
|
|||
func (cs *CollectionSettings) GetGarbageThreshold(collection string) string { |
|||
return cs.get(collection, keyGarbageThreshold).(string) |
|||
} |
|||
|
|||
func (cs *CollectionSettings) SetGarbageThreshold(collection string, gt string) { |
|||
if gt == "" { |
|||
cs.set(collection, keyGarbageThreshold, nil) |
|||
} else { |
|||
cs.set(collection, keyGarbageThreshold, gt) |
|||
|
|||
} |
|||
} |
|||
|
|||
func (cs *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement { |
|||
v := cs.get(collection, keyReplicatePlacement) |
|||
if v == nil { |
|||
return nil |
|||
} else { |
|||
return v.(*ReplicaPlacement) |
|||
} |
|||
} |
|||
|
|||
func (cs *CollectionSettings) SetReplicaPlacement(collection, t string) error { |
|||
if t == "" { |
|||
cs.set(collection, keyReplicatePlacement, nil) |
|||
return nil |
|||
} |
|||
rp, e := NewReplicaPlacementFromString(t) |
|||
if e == nil { |
|||
cs.set(collection, keyReplicatePlacement, rp) |
|||
} |
|||
return e |
|||
} |
|||
@ -0,0 +1,32 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"reflect" |
|||
"testing" |
|||
) |
|||
|
|||
func TestCollectionSettings(t *testing.T) { |
|||
cs1 := NewCollectionSettings("000", "0.3") |
|||
cs1.SetReplicaPlacement("col1", "001") |
|||
cs1.SetGarbageThreshold("col2", "0.5") |
|||
|
|||
if cs1.GetGarbageThreshold("col1") != "0.3" || |
|||
cs1.GetGarbageThreshold("col2") != "0.5" || |
|||
cs1.GetGarbageThreshold("") != "0.3" || |
|||
cs1.GetReplicaPlacement("").String() != "000" || |
|||
cs1.GetReplicaPlacement("col1").String() != "001" || |
|||
cs1.GetReplicaPlacement("col2").String() != "000" { |
|||
t.Fatal("Value incorrect.") |
|||
} |
|||
pb := cs1.ToPbMessage() |
|||
if buf, e := json.MarshalIndent(pb, "", "\t"); e == nil { |
|||
t.Log(string(buf)) |
|||
} else { |
|||
t.Fatal(e) |
|||
} |
|||
cs2 := NewCollectionSettingsFromPbMessage(pb) |
|||
if !reflect.DeepEqual(cs1, cs2) { |
|||
t.Fatal("PbMessage convert incorrect.") |
|||
} |
|||
} |
|||
@ -0,0 +1,126 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"io/ioutil" |
|||
"strings" |
|||
"sync" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
) |
|||
|
|||
// DiskLocation is concurrent safe
|
|||
type DiskLocation struct { |
|||
Directory string |
|||
MaxVolumeCount int |
|||
volumes map[VolumeId]*Volume |
|||
mutex sync.RWMutex |
|||
} |
|||
|
|||
func NewDiskLocation(dir string, maxVolCount int) *DiskLocation { |
|||
return &DiskLocation{ |
|||
Directory: dir, |
|||
MaxVolumeCount: maxVolCount, |
|||
volumes: make(map[VolumeId]*Volume), |
|||
} |
|||
} |
|||
|
|||
func (l *DiskLocation) LoadExistingVolumes(needleMapKind NeedleMapType) { |
|||
if dirs, err := ioutil.ReadDir(l.Directory); err == nil { |
|||
for _, dir := range dirs { |
|||
name := dir.Name() |
|||
if !dir.IsDir() && strings.HasSuffix(name, ".dat") { |
|||
collection := "" |
|||
base := name[:len(name)-len(".dat")] |
|||
i := strings.LastIndex(base, "_") |
|||
if i > 0 { |
|||
collection, base = base[0:i], base[i+1:] |
|||
} |
|||
if vid, err := NewVolumeId(base); err == nil { |
|||
if !l.HasVolume(vid) { |
|||
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil); e == nil { |
|||
l.AddVolume(vid, v) |
|||
glog.V(1).Infof("data file %s, v=%d size=%d ttl=%s", l.Directory+"/"+name, v.Version(), v.Size(), v.Ttl.String()) |
|||
} else { |
|||
glog.V(0).Infof("new volume %s error %s", name, e) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
glog.V(0).Infoln("Store started on dir:", l.Directory, "with", l.VolumeCount(), "volumes", "max", l.MaxVolumeCount) |
|||
} |
|||
|
|||
func (l *DiskLocation) AddVolume(vid VolumeId, v *Volume) { |
|||
l.mutex.Lock() |
|||
defer l.mutex.Unlock() |
|||
l.volumes[vid] = v |
|||
} |
|||
|
|||
func (l *DiskLocation) DeleteVolume(vid VolumeId) (e error) { |
|||
l.mutex.Lock() |
|||
defer l.mutex.Unlock() |
|||
if v, ok := l.volumes[vid]; ok { |
|||
e = v.Destroy() |
|||
} |
|||
delete(l.volumes, vid) |
|||
return |
|||
} |
|||
|
|||
func (l *DiskLocation) DeleteCollection(collection string) (e error) { |
|||
l.mutex.Lock() |
|||
defer l.mutex.Unlock() |
|||
for k, v := range l.volumes { |
|||
if v.Collection == collection { |
|||
e = v.Destroy() |
|||
if e != nil { |
|||
return |
|||
} |
|||
delete(l.volumes, k) |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (l *DiskLocation) HasVolume(vid VolumeId) bool { |
|||
l.mutex.RLock() |
|||
defer l.mutex.RUnlock() |
|||
_, ok := l.volumes[vid] |
|||
return ok |
|||
} |
|||
|
|||
func (l *DiskLocation) GetVolume(vid VolumeId) (v *Volume, ok bool) { |
|||
l.mutex.RLock() |
|||
defer l.mutex.RUnlock() |
|||
v, ok = l.volumes[vid] |
|||
return |
|||
} |
|||
|
|||
func (l *DiskLocation) VolumeCount() int { |
|||
l.mutex.RLock() |
|||
defer l.mutex.RUnlock() |
|||
return len(l.volumes) |
|||
} |
|||
|
|||
func (l *DiskLocation) CloseAllVolume() { |
|||
l.mutex.RLock() |
|||
defer l.mutex.RUnlock() |
|||
for _, v := range l.volumes { |
|||
v.Close() |
|||
} |
|||
} |
|||
|
|||
// break walk when walker fuc return an error
|
|||
type VolumeWalker func(v *Volume) (e error) |
|||
|
|||
// must not add or delete volume in walker
|
|||
func (l *DiskLocation) WalkVolume(vw VolumeWalker) (e error) { |
|||
l.mutex.RLock() |
|||
defer l.mutex.RUnlock() |
|||
for _, v := range l.volumes { |
|||
if e = vw(v); e != nil { |
|||
return e |
|||
} |
|||
} |
|||
return |
|||
} |
|||
@ -0,0 +1,134 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"fmt" |
|||
"io/ioutil" |
|||
"os" |
|||
"sync" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
type NeedleMapType int |
|||
|
|||
const ( |
|||
NeedleMapInMemory NeedleMapType = iota |
|||
NeedleMapLevelDb |
|||
NeedleMapBoltDb |
|||
) |
|||
|
|||
type NeedleMapper interface { |
|||
Put(key uint64, offset uint32, size uint32) error |
|||
Get(key uint64) (element *NeedleValue, ok bool) |
|||
Delete(key uint64) error |
|||
Close() |
|||
Destroy() error |
|||
ContentSize() uint64 |
|||
DeletedSize() uint64 |
|||
FileCount() int |
|||
DeletedCount() int |
|||
MaxFileKey() uint64 |
|||
IndexFileSize() uint64 |
|||
IndexFileContent() ([]byte, error) |
|||
IndexFileName() string |
|||
} |
|||
|
|||
type baseNeedleMapper struct { |
|||
indexFile *os.File |
|||
mutex sync.RWMutex |
|||
deletionCounter int |
|||
fileCounter int |
|||
deletionByteCounter uint64 |
|||
fileByteCounter uint64 |
|||
maximumFileKey uint64 |
|||
} |
|||
|
|||
func (nm *baseNeedleMapper) IndexFileSize() uint64 { |
|||
nm.mutex.RLock() |
|||
defer nm.mutex.RUnlock() |
|||
stat, err := nm.indexFile.Stat() |
|||
if err == nil { |
|||
return uint64(stat.Size()) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func (nm *baseNeedleMapper) IndexFileName() string { |
|||
nm.mutex.RLock() |
|||
defer nm.mutex.RUnlock() |
|||
return nm.indexFile.Name() |
|||
} |
|||
|
|||
func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) { |
|||
key = util.BytesToUint64(bytes[:8]) |
|||
offset = util.BytesToUint32(bytes[8:12]) |
|||
size = util.BytesToUint32(bytes[12:16]) |
|||
return |
|||
} |
|||
func (nm *baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error { |
|||
bytes := make([]byte, 16) |
|||
util.Uint64toBytes(bytes[0:8], key) |
|||
util.Uint32toBytes(bytes[8:12], offset) |
|||
util.Uint32toBytes(bytes[12:16], size) |
|||
|
|||
nm.mutex.Lock() |
|||
defer nm.mutex.Unlock() |
|||
if _, err := nm.indexFile.Seek(0, 2); err != nil { |
|||
return fmt.Errorf("cannot seek end of indexfile %s: %v", |
|||
nm.indexFile.Name(), err) |
|||
} |
|||
_, err := nm.indexFile.Write(bytes) |
|||
return err |
|||
} |
|||
func (nm *baseNeedleMapper) IndexFileContent() ([]byte, error) { |
|||
nm.mutex.RLock() |
|||
defer nm.mutex.RUnlock() |
|||
return ioutil.ReadFile(nm.indexFile.Name()) |
|||
} |
|||
|
|||
func (nm *baseNeedleMapper) logDelete(deletedByteCount uint32) { |
|||
nm.mutex.Lock() |
|||
defer nm.mutex.Unlock() |
|||
nm.deletionByteCounter = nm.deletionByteCounter + uint64(deletedByteCount) |
|||
nm.deletionCounter++ |
|||
} |
|||
|
|||
func (nm *baseNeedleMapper) logPut(key uint64, oldSize uint32, newSize uint32) { |
|||
nm.mutex.Lock() |
|||
defer nm.mutex.Unlock() |
|||
if key > nm.maximumFileKey { |
|||
nm.maximumFileKey = key |
|||
} |
|||
nm.fileCounter++ |
|||
nm.fileByteCounter = nm.fileByteCounter + uint64(newSize) |
|||
if oldSize > 0 { |
|||
nm.deletionCounter++ |
|||
nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) |
|||
} |
|||
} |
|||
|
|||
func (nm *baseNeedleMapper) ContentSize() uint64 { |
|||
nm.mutex.RLock() |
|||
defer nm.mutex.RUnlock() |
|||
return nm.fileByteCounter |
|||
} |
|||
func (nm *baseNeedleMapper) DeletedSize() uint64 { |
|||
nm.mutex.RLock() |
|||
defer nm.mutex.RUnlock() |
|||
return nm.deletionByteCounter |
|||
} |
|||
func (nm *baseNeedleMapper) FileCount() int { |
|||
nm.mutex.RLock() |
|||
defer nm.mutex.RUnlock() |
|||
return nm.fileCounter |
|||
} |
|||
func (nm *baseNeedleMapper) DeletedCount() int { |
|||
nm.mutex.RLock() |
|||
defer nm.mutex.RUnlock() |
|||
return nm.deletionCounter |
|||
} |
|||
func (nm *baseNeedleMapper) MaxFileKey() uint64 { |
|||
nm.mutex.RLock() |
|||
defer nm.mutex.RUnlock() |
|||
return nm.maximumFileKey |
|||
} |
|||
@ -0,0 +1,450 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
"math/rand" |
|||
"strconv" |
|||
"strings" |
|||
|
|||
"sync" |
|||
|
|||
"encoding/json" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/operation" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/chrislusf/seaweedfs/weed/weedpb" |
|||
) |
|||
|
|||
const ( |
|||
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
|
|||
) |
|||
|
|||
type MasterNodes struct { |
|||
nodes []string |
|||
master string |
|||
mutex sync.RWMutex |
|||
} |
|||
|
|||
func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) { |
|||
mn = &MasterNodes{nodes: []string{bootstrapNode}} |
|||
return |
|||
} |
|||
|
|||
func (mn *MasterNodes) String() string { |
|||
mn.mutex.RLock() |
|||
defer mn.mutex.RUnlock() |
|||
return fmt.Sprintf("nodes:%v, master:%s", mn.nodes, mn.master) |
|||
} |
|||
|
|||
func (mn *MasterNodes) Reset() { |
|||
glog.V(4).Infof("Resetting master nodes: %v", mn) |
|||
mn.mutex.Lock() |
|||
defer mn.mutex.Unlock() |
|||
if len(mn.nodes) > 1 && mn.master != "" { |
|||
glog.V(0).Infof("Reset master %s from: %v", mn.master, mn.nodes) |
|||
mn.master = "" |
|||
} |
|||
} |
|||
|
|||
func (mn *MasterNodes) findMaster() (string, error) { |
|||
master := mn.GetMaster() |
|||
if master != "" { |
|||
return master, nil |
|||
} |
|||
mn.mutex.Lock() |
|||
defer mn.mutex.Unlock() |
|||
if len(mn.nodes) == 0 { |
|||
return "", errors.New("No master node found!") |
|||
} |
|||
if mn.master == "" { |
|||
for _, m := range mn.nodes { |
|||
glog.V(4).Infof("Listing masters on %s", m) |
|||
if masters, e := operation.ListMasters(m); e == nil { |
|||
if len(masters) == 0 { |
|||
continue |
|||
} |
|||
mn.nodes = append(masters, m) |
|||
mn.master = mn.nodes[rand.Intn(len(mn.nodes))] |
|||
glog.V(2).Infof("current master nodes is (nodes:%v, master:%s)", mn.nodes, mn.master) |
|||
break |
|||
} else { |
|||
glog.V(4).Infof("Failed listing masters on %s: %v", m, e) |
|||
} |
|||
} |
|||
} |
|||
if mn.master == "" { |
|||
return "", errors.New("No master node available!") |
|||
} |
|||
return mn.master, nil |
|||
} |
|||
|
|||
func (mn *MasterNodes) GetMaster() string { |
|||
mn.mutex.RLock() |
|||
defer mn.mutex.RUnlock() |
|||
return mn.master |
|||
} |
|||
|
|||
/* |
|||
* A VolumeServer contains one Store |
|||
*/ |
|||
type Store struct { |
|||
joinKey string |
|||
ip string |
|||
Port int |
|||
PublicUrl string |
|||
Locations []*DiskLocation |
|||
colSettings *CollectionSettings |
|||
dataCenter string //optional informaton, overwriting master setting if exists
|
|||
rack string //optional information, overwriting master setting if exists
|
|||
volumeSizeLimit uint64 //read from the master
|
|||
masterNodes *MasterNodes |
|||
needleMapKind NeedleMapType |
|||
TaskManager *TaskManager |
|||
mutex sync.RWMutex |
|||
} |
|||
|
|||
func (s *Store) String() (str string) { |
|||
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, joinKey:%v, volumeSizeLimit:%d, masterNodes:%s", |
|||
s.GetIP(), s.Port, s.PublicUrl, s.dataCenter, s.rack, s.GetJoinKey(), s.GetVolumeSizeLimit(), s.masterNodes) |
|||
return |
|||
} |
|||
|
|||
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { |
|||
s = &Store{ |
|||
Port: port, |
|||
ip: ip, |
|||
PublicUrl: publicUrl, |
|||
TaskManager: NewTaskManager(), |
|||
needleMapKind: needleMapKind, |
|||
} |
|||
s.Locations = make([]*DiskLocation, 0) |
|||
for i := 0; i < len(dirnames); i++ { |
|||
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) |
|||
location.LoadExistingVolumes(needleMapKind) |
|||
s.Locations = append(s.Locations, location) |
|||
} |
|||
return |
|||
} |
|||
func (s *Store) AddVolume(volumeListString string, collection string, ttlString string) error { |
|||
ttl, e := ReadTTL(ttlString) |
|||
if e != nil { |
|||
return e |
|||
} |
|||
for _, range_string := range strings.Split(volumeListString, ",") { |
|||
if strings.Index(range_string, "-") < 0 { |
|||
id_string := range_string |
|||
id, err := NewVolumeId(id_string) |
|||
if err != nil { |
|||
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) |
|||
} |
|||
e = s.addVolume(VolumeId(id), collection, ttl) |
|||
} else { |
|||
pair := strings.Split(range_string, "-") |
|||
start, start_err := strconv.ParseUint(pair[0], 10, 64) |
|||
if start_err != nil { |
|||
return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0]) |
|||
} |
|||
end, end_err := strconv.ParseUint(pair[1], 10, 64) |
|||
if end_err != nil { |
|||
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) |
|||
} |
|||
for id := start; id <= end; id++ { |
|||
if err := s.addVolume(VolumeId(id), collection, ttl); err != nil { |
|||
e = err |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return e |
|||
} |
|||
func (s *Store) DeleteCollection(collection string) (e error) { |
|||
for _, location := range s.Locations { |
|||
location.DeleteCollection(collection) |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (s *Store) findVolume(vid VolumeId) *Volume { |
|||
for _, location := range s.Locations { |
|||
if v, found := location.GetVolume(vid); found { |
|||
return v |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
func (s *Store) findFreeLocation() (ret *DiskLocation) { |
|||
max := 0 |
|||
for _, location := range s.Locations { |
|||
currentFreeCount := location.MaxVolumeCount - location.VolumeCount() |
|||
if currentFreeCount > max { |
|||
max = currentFreeCount |
|||
ret = location |
|||
} |
|||
} |
|||
return ret |
|||
} |
|||
func (s *Store) addVolume(vid VolumeId, collection string, ttl *TTL) error { |
|||
if s.findVolume(vid) != nil { |
|||
return fmt.Errorf("Volume Id %d already exists!", vid) |
|||
} |
|||
if location := s.findFreeLocation(); location != nil { |
|||
glog.V(0).Infof("In dir %s adds volume:%v collection:%s ttl:%v", |
|||
location.Directory, vid, collection, ttl) |
|||
if volume, err := NewVolume(location.Directory, collection, vid, s.needleMapKind, ttl); err == nil { |
|||
location.AddVolume(vid, volume) |
|||
return nil |
|||
} else { |
|||
return err |
|||
} |
|||
} |
|||
return fmt.Errorf("No more free space left") |
|||
} |
|||
|
|||
func (s *Store) Status() []*VolumeInfo { |
|||
var stats []*VolumeInfo |
|||
for _, location := range s.Locations { |
|||
location.WalkVolume(func(v *Volume) (e error) { |
|||
s := &VolumeInfo{ |
|||
Id: VolumeId(v.Id), |
|||
Size: v.ContentSize(), |
|||
Collection: v.Collection, |
|||
Version: v.Version(), |
|||
FileCount: v.nm.FileCount(), |
|||
DeleteCount: v.nm.DeletedCount(), |
|||
DeletedByteCount: v.nm.DeletedSize(), |
|||
ReadOnly: v.IsReadOnly(), |
|||
Ttl: v.Ttl} |
|||
stats = append(stats, s) |
|||
return nil |
|||
}) |
|||
} |
|||
sortVolumeInfos(stats) |
|||
return stats |
|||
} |
|||
|
|||
func (s *Store) SetDataCenter(dataCenter string) { |
|||
s.dataCenter = dataCenter |
|||
} |
|||
func (s *Store) SetRack(rack string) { |
|||
s.rack = rack |
|||
} |
|||
|
|||
func (s *Store) SetBootstrapMaster(bootstrapMaster string) { |
|||
s.mutex.Lock() |
|||
defer s.mutex.Unlock() |
|||
s.masterNodes = NewMasterNodes(bootstrapMaster) |
|||
} |
|||
|
|||
type SettingChanged func(s *weedpb.JoinResponse) |
|||
|
|||
func (s *Store) SendHeartbeatToMaster(callback SettingChanged) error { |
|||
masterNode, err := s.masterNodes.findMaster() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
var volumeMessages []*weedpb.VolumeInformationMessage |
|||
maxVolumeCount := 0 |
|||
var maxFileKey uint64 |
|||
for _, location := range s.Locations { |
|||
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount |
|||
volumeToDelete := []VolumeId{} |
|||
location.WalkVolume(func(v *Volume) (e error) { |
|||
if maxFileKey < v.nm.MaxFileKey() { |
|||
maxFileKey = v.nm.MaxFileKey() |
|||
} |
|||
if !v.expired(s.GetVolumeSizeLimit()) { |
|||
volumeMessage := &weedpb.VolumeInformationMessage{ |
|||
Id: uint32(v.Id), |
|||
Size: uint64(v.Size()), |
|||
Collection: v.Collection, |
|||
FileCount: uint64(v.nm.FileCount()), |
|||
DeleteCount: uint64(v.nm.DeletedCount()), |
|||
DeletedByteCount: v.nm.DeletedSize(), |
|||
ReadOnly: v.IsReadOnly(), |
|||
Version: uint32(v.Version()), |
|||
Ttl: v.Ttl.ToUint32(), |
|||
} |
|||
volumeMessages = append(volumeMessages, volumeMessage) |
|||
} else { |
|||
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { |
|||
volumeToDelete = append(volumeToDelete, v.Id) |
|||
glog.V(0).Infoln("volume", v.Id, "is deleted.") |
|||
} else { |
|||
glog.V(0).Infoln("volume", v.Id, "is expired.") |
|||
} |
|||
} |
|||
return nil |
|||
}) |
|||
for _, vid := range volumeToDelete { |
|||
location.DeleteVolume(vid) |
|||
} |
|||
} |
|||
|
|||
joinMsgV2 := &weedpb.JoinMessageV2{ |
|||
JoinKey: s.GetJoinKey(), |
|||
Ip: s.GetIP(), |
|||
Port: uint32(s.Port), |
|||
PublicUrl: s.PublicUrl, |
|||
MaxVolumeCount: uint32(maxVolumeCount), |
|||
MaxFileKey: maxFileKey, |
|||
DataCenter: s.dataCenter, |
|||
Rack: s.rack, |
|||
Volumes: volumeMessages, |
|||
} |
|||
ret := &weedpb.JoinResponse{} |
|||
joinUrl := util.MkUrl(masterNode, "/dir/join2", nil) |
|||
glog.V(4).Infof("Sending heartbeat to %s ...", joinUrl) |
|||
if err = util.PostPbMsg(joinUrl, joinMsgV2, ret); err != nil { |
|||
s.masterNodes.Reset() |
|||
return err |
|||
} |
|||
|
|||
if ret.Error != "" { |
|||
s.masterNodes.Reset() |
|||
return errors.New(ret.Error) |
|||
} |
|||
if ret.JoinKey != s.GetJoinKey() { |
|||
if glog.V(4) { |
|||
jsonData, _ := json.Marshal(ret) |
|||
glog.V(4).Infof("dir join sync settings: %v", string(jsonData)) |
|||
} |
|||
s.SetJoinKey(ret.JoinKey) |
|||
if ret.JoinIp != "" { |
|||
s.SetIP(ret.JoinIp) |
|||
} |
|||
if ret.VolumeSizeLimit != 0 { |
|||
s.SetVolumeSizeLimit(ret.VolumeSizeLimit) |
|||
} |
|||
if callback != nil { |
|||
callback(ret) |
|||
} |
|||
if len(ret.CollectionSettings) > 0 { |
|||
cs := NewCollectionSettingsFromPbMessage(ret.CollectionSettings) |
|||
s.SetCollectionSettings(cs) |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
func (s *Store) Close() { |
|||
for _, location := range s.Locations { |
|||
location.CloseAllVolume() |
|||
} |
|||
} |
|||
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { |
|||
if v := s.findVolume(i); v != nil { |
|||
if v.IsReadOnly() { |
|||
err = fmt.Errorf("Volume %d is read only", i) |
|||
return |
|||
} |
|||
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { |
|||
size, err = v.write(n) |
|||
} else { |
|||
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) |
|||
} |
|||
if s.GetVolumeSizeLimit() < v.ContentSize()+3*uint64(size) { |
|||
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.GetVolumeSizeLimit()) |
|||
if e := s.SendHeartbeatToMaster(nil); e != nil { |
|||
glog.V(0).Infoln("error when reporting size:", e) |
|||
} |
|||
} |
|||
return |
|||
} |
|||
glog.V(0).Infoln("volume", i, "not found!") |
|||
err = fmt.Errorf("Volume %d not found!", i) |
|||
return |
|||
} |
|||
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { |
|||
if v := s.findVolume(i); v != nil && !v.IsReadOnly() { |
|||
return v.delete(n) |
|||
} |
|||
return 0, nil |
|||
} |
|||
func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { |
|||
if v := s.findVolume(i); v != nil { |
|||
return v.readNeedle(n) |
|||
} |
|||
return 0, fmt.Errorf("Volume %v not found!", i) |
|||
} |
|||
func (s *Store) GetVolume(i VolumeId) *Volume { |
|||
return s.findVolume(i) |
|||
} |
|||
|
|||
func (s *Store) HasVolume(i VolumeId) bool { |
|||
v := s.findVolume(i) |
|||
return v != nil |
|||
} |
|||
|
|||
func (s *Store) WalkVolume(walker VolumeWalker) error { |
|||
for _, location := range s.Locations { |
|||
if e := location.WalkVolume(walker); e != nil { |
|||
return e |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (s *Store) GetVolumeSizeLimit() uint64 { |
|||
s.mutex.RLock() |
|||
defer s.mutex.RUnlock() |
|||
return s.volumeSizeLimit |
|||
} |
|||
|
|||
func (s *Store) SetVolumeSizeLimit(sz uint64) { |
|||
s.mutex.Lock() |
|||
defer s.mutex.Unlock() |
|||
s.volumeSizeLimit = sz |
|||
} |
|||
|
|||
func (s *Store) GetIP() string { |
|||
s.mutex.RLock() |
|||
defer s.mutex.RUnlock() |
|||
return s.ip |
|||
} |
|||
|
|||
func (s *Store) SetIP(ip string) { |
|||
s.mutex.Lock() |
|||
defer s.mutex.Unlock() |
|||
s.ip = ip |
|||
} |
|||
|
|||
func (s *Store) GetJoinKey() string { |
|||
s.mutex.RLock() |
|||
defer s.mutex.RUnlock() |
|||
return s.joinKey |
|||
} |
|||
|
|||
func (s *Store) SetJoinKey(k string) { |
|||
s.mutex.Lock() |
|||
defer s.mutex.Unlock() |
|||
s.joinKey = k |
|||
} |
|||
|
|||
func (s *Store) GetMaster() string { |
|||
return s.masterNodes.GetMaster() |
|||
} |
|||
|
|||
func (s *Store) GetCollectionSettings() *CollectionSettings { |
|||
s.mutex.RLock() |
|||
defer s.mutex.RUnlock() |
|||
return s.colSettings |
|||
} |
|||
|
|||
func (s *Store) SetCollectionSettings(cs *CollectionSettings) { |
|||
s.mutex.Lock() |
|||
defer s.mutex.Unlock() |
|||
s.colSettings = cs |
|||
} |
|||
|
|||
func (s *Store) GetVolumeReplicaPlacement(volumeId VolumeId) *ReplicaPlacement { |
|||
cs := s.GetCollectionSettings() |
|||
if cs == nil { |
|||
return nil |
|||
} |
|||
collection := "" |
|||
if v := s.GetVolume(volumeId); v != nil { |
|||
collection = v.Collection |
|||
} |
|||
return cs.GetReplicaPlacement(collection) |
|||
} |
|||
@ -0,0 +1,160 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"errors" |
|||
"net/url" |
|||
"time" |
|||
|
|||
"sync" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
) |
|||
|
|||
const ( |
|||
TaskVacuum = "vacuum" |
|||
TaskReplicate = "replicate" |
|||
TaskBalance = "balance" |
|||
) |
|||
|
|||
var ( |
|||
ErrTaskNotFinish = errors.New("TaskNotFinish") |
|||
ErrTaskNotFound = errors.New("TaskNotFound") |
|||
ErrTaskInvalid = errors.New("TaskInvalid") |
|||
ErrTaskExists = errors.New("TaskExists") |
|||
) |
|||
|
|||
type TaskWorker interface { |
|||
Run() error |
|||
Commit() error |
|||
Clean() error |
|||
Info() url.Values |
|||
} |
|||
|
|||
type task struct { |
|||
Id string |
|||
startTime time.Time |
|||
worker TaskWorker |
|||
ch chan bool |
|||
result error |
|||
cleanWhenFinish bool |
|||
} |
|||
|
|||
type TaskManager struct { |
|||
taskList map[string]*task |
|||
lock sync.RWMutex |
|||
} |
|||
|
|||
func newTask(worker TaskWorker, id string) *task { |
|||
t := &task{ |
|||
Id: id, |
|||
worker: worker, |
|||
startTime: time.Now(), |
|||
result: ErrTaskNotFinish, |
|||
ch: make(chan bool, 1), |
|||
} |
|||
go func(t *task) { |
|||
t.result = t.worker.Run() |
|||
if t.cleanWhenFinish { |
|||
glog.V(0).Infof("clean task (%s) when finish.", t.Id) |
|||
t.worker.Clean() |
|||
} |
|||
t.ch <- true |
|||
|
|||
}(t) |
|||
return t |
|||
} |
|||
|
|||
func (t *task) queryResult(waitDuration time.Duration) error { |
|||
if t.result == ErrTaskNotFinish && waitDuration > 0 { |
|||
select { |
|||
case <-time.After(waitDuration): |
|||
case <-t.ch: |
|||
} |
|||
} |
|||
return t.result |
|||
} |
|||
|
|||
func NewTaskManager() *TaskManager { |
|||
return &TaskManager{ |
|||
taskList: make(map[string]*task), |
|||
} |
|||
} |
|||
|
|||
func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) { |
|||
tm.lock.Lock() |
|||
defer tm.lock.Unlock() |
|||
tt := args.Get("task") |
|||
vid := args.Get("volume") |
|||
tid = tt + "-" + vid |
|||
if _, ok := tm.taskList[tid]; ok { |
|||
return tid, ErrTaskExists |
|||
} |
|||
|
|||
var tw TaskWorker |
|||
switch tt { |
|||
case TaskVacuum: |
|||
tw, e = NewVacuumTask(s, args) |
|||
case TaskReplicate: |
|||
tw, e = NewReplicaTask(s, args) |
|||
case TaskBalance: |
|||
} |
|||
if e != nil { |
|||
return |
|||
} |
|||
if tw == nil { |
|||
return "", ErrTaskInvalid |
|||
} |
|||
tm.taskList[tid] = newTask(tw, tid) |
|||
return tid, nil |
|||
} |
|||
|
|||
func (tm *TaskManager) QueryResult(tid string, waitDuration time.Duration) (e error) { |
|||
tm.lock.RLock() |
|||
defer tm.lock.RUnlock() |
|||
t, ok := tm.taskList[tid] |
|||
if !ok { |
|||
return ErrTaskNotFound |
|||
} |
|||
return t.queryResult(waitDuration) |
|||
} |
|||
|
|||
func (tm *TaskManager) Commit(tid string) (e error) { |
|||
tm.lock.Lock() |
|||
defer tm.lock.Unlock() |
|||
t, ok := tm.taskList[tid] |
|||
if !ok { |
|||
return ErrTaskNotFound |
|||
} |
|||
if t.queryResult(time.Second*30) == ErrTaskNotFinish { |
|||
return ErrTaskNotFinish |
|||
} |
|||
delete(tm.taskList, tid) |
|||
return t.worker.Commit() |
|||
} |
|||
|
|||
func (tm *TaskManager) Clean(tid string) error { |
|||
tm.lock.Lock() |
|||
defer tm.lock.Unlock() |
|||
t, ok := tm.taskList[tid] |
|||
if !ok { |
|||
return ErrTaskNotFound |
|||
} |
|||
delete(tm.taskList, tid) |
|||
if t.queryResult(time.Second*30) == ErrTaskNotFinish { |
|||
t.cleanWhenFinish = true |
|||
glog.V(0).Infof("task (%s) is not finish, clean it later.", tid) |
|||
} else { |
|||
t.worker.Clean() |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) { |
|||
tm.lock.RLock() |
|||
defer tm.lock.RUnlock() |
|||
t, ok := tm.taskList[tid] |
|||
if !ok { |
|||
return 0, ErrTaskNotFound |
|||
} |
|||
return time.Since(t.startTime), nil |
|||
} |
|||
@ -0,0 +1,87 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
"net/url" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
type TaskParams map[string]string |
|||
|
|||
var ( |
|||
ErrTaskTimeout = errors.New("TaskTimeout") |
|||
) |
|||
|
|||
type TaskCli struct { |
|||
TID string |
|||
DataNode string |
|||
} |
|||
|
|||
func NewTaskCli(dataNode string, taskType string, params TaskParams) (*TaskCli, error) { |
|||
args := url.Values{} |
|||
args.Set("task", taskType) |
|||
for k, v := range params { |
|||
args.Set(k, v) |
|||
} |
|||
m, e := util.RemoteApiCall(dataNode, "/admin/task/new", args) |
|||
if e != nil { |
|||
return nil, e |
|||
} |
|||
tid := m["tid"].(string) |
|||
if tid == "" { |
|||
return nil, fmt.Errorf("Empty %s task", taskType) |
|||
} |
|||
return &TaskCli{ |
|||
TID: tid, |
|||
DataNode: dataNode, |
|||
}, nil |
|||
} |
|||
|
|||
func (c *TaskCli) WaitAndQueryResult(timeout time.Duration) error { |
|||
startTime := time.Now() |
|||
args := url.Values{} |
|||
args.Set("tid", c.TID) |
|||
args.Set("timeout", time.Minute.String()) |
|||
tryTimes := 0 |
|||
for time.Since(startTime) < timeout { |
|||
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/query", args) |
|||
if e == nil { |
|||
//task have finished and have no error
|
|||
return nil |
|||
} |
|||
if util.IsRemoteApiError(e) { |
|||
if e.Error() == ErrTaskNotFinish.Error() { |
|||
tryTimes = 0 |
|||
continue |
|||
} |
|||
return e |
|||
} else { |
|||
tryTimes++ |
|||
if tryTimes >= 10 { |
|||
return e |
|||
} |
|||
glog.V(0).Infof("query task (%s) error %v, wait 1 minute then retry %d times", c.TID, e, tryTimes) |
|||
time.Sleep(time.Minute) |
|||
} |
|||
|
|||
} |
|||
return ErrTaskTimeout |
|||
} |
|||
|
|||
func (c *TaskCli) Commit() error { |
|||
args := url.Values{} |
|||
args.Set("tid", c.TID) |
|||
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/commit", args) |
|||
return e |
|||
} |
|||
|
|||
func (c *TaskCli) Clean() error { |
|||
args := url.Values{} |
|||
args.Set("tid", c.TID) |
|||
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/clean", args) |
|||
return e |
|||
} |
|||
@ -0,0 +1,115 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
"net/url" |
|||
"os" |
|||
"path" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
type ReplicaTask struct { |
|||
VID VolumeId |
|||
Collection string |
|||
SrcDataNode string |
|||
s *Store |
|||
location *DiskLocation |
|||
} |
|||
|
|||
func NewReplicaTask(s *Store, args url.Values) (*ReplicaTask, error) { |
|||
volumeIdString := args.Get("volume") |
|||
vid, err := NewVolumeId(volumeIdString) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) |
|||
} |
|||
source := args.Get("source") |
|||
if source == "" { |
|||
return nil, errors.New("Invalid source data node.") |
|||
|
|||
} |
|||
location := s.findFreeLocation() |
|||
if location == nil { |
|||
return nil, errors.New("No more free space left") |
|||
} |
|||
collection := args.Get("collection") |
|||
return &ReplicaTask{ |
|||
VID: vid, |
|||
Collection: collection, |
|||
SrcDataNode: source, |
|||
s: s, |
|||
location: location, |
|||
}, nil |
|||
} |
|||
|
|||
func (t *ReplicaTask) Run() error { |
|||
ch := make(chan error) |
|||
go func() { |
|||
idxUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/index", url.Values{"volume": {t.VID.String()}}) |
|||
e := util.DownloadToFile(idxUrl, t.FileName()+".repx") |
|||
if e != nil { |
|||
e = fmt.Errorf("Replicat error: %s, %v", idxUrl, e) |
|||
} |
|||
ch <- e |
|||
}() |
|||
go func() { |
|||
datUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/vol_data", url.Values{"volume": {t.VID.String()}}) |
|||
e := util.DownloadToFile(datUrl, t.FileName()+".repd") |
|||
if e != nil { |
|||
e = fmt.Errorf("Replicat error: %s, %v", datUrl, e) |
|||
} |
|||
ch <- e |
|||
}() |
|||
errs := make([]error, 0) |
|||
for i := 0; i < 2; i++ { |
|||
if e := <-ch; e != nil { |
|||
errs = append(errs, e) |
|||
} |
|||
} |
|||
if len(errs) == 0 { |
|||
return nil |
|||
} else { |
|||
return fmt.Errorf("%v", errs) |
|||
} |
|||
} |
|||
|
|||
func (t *ReplicaTask) Commit() error { |
|||
var ( |
|||
volume *Volume |
|||
e error |
|||
) |
|||
|
|||
if e = os.Rename(t.FileName()+".repd", t.FileName()+".dat"); e != nil { |
|||
return e |
|||
} |
|||
if e = os.Rename(t.FileName()+".repx", t.FileName()+".idx"); e != nil { |
|||
return e |
|||
} |
|||
volume, e = NewVolume(t.location.Directory, t.Collection, t.VID, t.s.needleMapKind, nil) |
|||
if e == nil { |
|||
t.location.AddVolume(t.VID, volume) |
|||
t.s.SendHeartbeatToMaster(nil) |
|||
} |
|||
return e |
|||
} |
|||
|
|||
func (t *ReplicaTask) Clean() error { |
|||
os.Remove(t.FileName() + ".repx") |
|||
os.Remove(t.FileName() + ".repd") |
|||
return nil |
|||
} |
|||
|
|||
func (t *ReplicaTask) Info() url.Values { |
|||
//TODO
|
|||
return url.Values{} |
|||
} |
|||
|
|||
func (t *ReplicaTask) FileName() (fileName string) { |
|||
if t.Collection == "" { |
|||
fileName = path.Join(t.location.Directory, t.VID.String()) |
|||
} else { |
|||
fileName = path.Join(t.location.Directory, t.Collection+"_"+t.VID.String()) |
|||
} |
|||
return |
|||
} |
|||
Some files were not shown because too many files changed in this diff
Write
Preview
Loading…
Cancel
Save
Reference in new issue