Browse Source

change filer API to gRPC

pull/655/head
Chris Lu 7 years ago
parent
commit
43a69d20bf
  1. 23
      weed/command/filer.go
  2. 125
      weed/filer/client_operations.go
  3. 583
      weed/filer/filer.pb.go
  4. 100
      weed/filesys/dir.go
  5. 61
      weed/filesys/file.go
  6. 20
      weed/filesys/wfs.go
  7. 1
      weed/pb/Makefile
  8. 85
      weed/pb/filer.proto
  9. 108
      weed/server/filer_grpc_server.go
  10. 1
      weed/server/filer_server.go
  11. 83
      weed/server/filer_server_handlers_api.go

23
weed/command/filer.go

@ -9,6 +9,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/soheilhy/cmux"
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/filer"
"google.golang.org/grpc"
)
var (
@ -99,7 +103,7 @@ func (fo *FilerOptions) start() {
publicVolumeMux = http.NewServeMux()
}
_, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux,
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux,
*fo.ip, *fo.port, *fo.master, *fo.dir, *fo.collection,
*fo.defaultReplicaPlacement, *fo.redirectOnRead, *fo.disableDirListing,
*fo.confFile,
@ -134,7 +138,22 @@ func (fo *FilerOptions) start() {
if e != nil {
glog.Fatalf("Filer listener error: %v", e)
}
if e := http.Serve(filerListener, defaultMux); e != nil {
m := cmux.New(filerListener)
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(cmux.Any())
// Create your protocol servers.
grpcS := grpc.NewServer()
filer.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
httpS := &http.Server{Handler: defaultMux}
go grpcS.Serve(grpcL)
go httpS.Serve(httpL)
if err := m.Serve(); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
}

125
weed/filer/client_operations.go

@ -1,125 +0,0 @@
package filer
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"github.com/chrislusf/seaweedfs/weed/util"
)
type ApiRequest struct {
Command string //"listFiles", "listDirectories", "getFileSize"
Directory string
FileName string
FileId string
}
type ListFilesResult struct {
Files []FileEntry
Error string `json:"error,omitempty"`
}
func ListFiles(server string, directory string, fileName string) (ret *ListFilesResult, err error) {
ret = new(ListFilesResult)
if err = call(server, ApiRequest{Command: "listFiles", Directory: directory, FileName: fileName}, ret); err == nil {
if ret.Error != "" {
return nil, errors.New(ret.Error)
}
return ret, nil
}
return nil, err
}
type GetFileSizeResult struct {
Size uint64
Error string `json:"error,omitempty"`
}
func GetFileSize(server string, fileId string) (ret *GetFileSizeResult, err error) {
ret = new(GetFileSizeResult)
if err = call(server, ApiRequest{Command: "getFileSize", FileId: fileId}, ret); err == nil {
if ret.Error != "" {
return nil, errors.New(ret.Error)
}
return ret, nil
}
return nil, err
}
type GetFileContentResult struct {
Content []byte
Error string `json:"error,omitempty"`
}
func GetFileContent(server string, fileId string) (ret *GetFileContentResult, err error) {
ret = new(GetFileContentResult)
if err = call(server, ApiRequest{Command: "getFileContent", FileId: fileId}, ret); err == nil {
if ret.Error != "" {
return nil, errors.New(ret.Error)
}
return ret, nil
}
return nil, err
}
type ListDirectoriesResult struct {
Directories []DirectoryName
Error string `json:"error,omitempty"`
}
func ListDirectories(server string, directory string) (ret *ListDirectoriesResult, err error) {
ret = new(ListDirectoriesResult)
if err := call(server, ApiRequest{Command: "listDirectories", Directory: directory}, ret); err == nil {
if ret.Error != "" {
return nil, errors.New(ret.Error)
}
return ret, nil
}
return nil, err
}
type LookupDirectoryEntryResult struct {
Found bool
FileId string
Error string `json:"error,omitempty"`
}
func LookupDirectoryEntry(server string, directory string, name string) (ret *LookupDirectoryEntryResult, err error) {
ret = new(LookupDirectoryEntryResult)
if err := call(server, ApiRequest{Command: "lookupDirectoryEntry", Directory: directory, FileName: name}, ret); err == nil {
if ret.Error != "" {
return nil, errors.New(ret.Error)
}
return ret, nil
}
return nil, err
}
func DeleteDirectoryOrFile(server string, path string, isDir bool) error {
destUrl := fmt.Sprintf("http://%s%s", server, path)
if isDir {
destUrl += "/?recursive=true"
}
return util.Delete(destUrl, "")
}
func call(server string, request ApiRequest, ret interface{}) error {
b, err := json.Marshal(request)
if err != nil {
fmt.Println("error:", err)
return nil
}
values := make(url.Values)
values.Add("request", string(b))
jsonBlob, err := util.Post("http://"+server+"/__api__", values)
if err != nil {
return err
}
err = json.Unmarshal(jsonBlob, ret)
if err != nil {
return err
}
return nil
}

583
weed/filer/filer.pb.go

@ -0,0 +1,583 @@
// Code generated by protoc-gen-go.
// source: filer.proto
// DO NOT EDIT!
/*
Package filer is a generated protocol buffer package.
It is generated from these files:
filer.proto
It has these top-level messages:
LookupDirectoryEntryRequest
LookupDirectoryEntryResponse
ListEntriesRequest
ListEntriesResponse
Entry
FuseAttributes
GetFileAttributesRequest
GetFileAttributesResponse
GetFileContentRequest
GetFileContentResponse
DeleteEntryRequest
DeleteEntryResponse
*/
package filer
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type LookupDirectoryEntryRequest struct {
Directory string `protobuf:"bytes,1,opt,name=directory" json:"directory,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
}
func (m *LookupDirectoryEntryRequest) Reset() { *m = LookupDirectoryEntryRequest{} }
func (m *LookupDirectoryEntryRequest) String() string { return proto.CompactTextString(m) }
func (*LookupDirectoryEntryRequest) ProtoMessage() {}
func (*LookupDirectoryEntryRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *LookupDirectoryEntryRequest) GetDirectory() string {
if m != nil {
return m.Directory
}
return ""
}
func (m *LookupDirectoryEntryRequest) GetName() string {
if m != nil {
return m.Name
}
return ""
}
type LookupDirectoryEntryResponse struct {
Entry *Entry `protobuf:"bytes,1,opt,name=entry" json:"entry,omitempty"`
}
func (m *LookupDirectoryEntryResponse) Reset() { *m = LookupDirectoryEntryResponse{} }
func (m *LookupDirectoryEntryResponse) String() string { return proto.CompactTextString(m) }
func (*LookupDirectoryEntryResponse) ProtoMessage() {}
func (*LookupDirectoryEntryResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *LookupDirectoryEntryResponse) GetEntry() *Entry {
if m != nil {
return m.Entry
}
return nil
}
type ListEntriesRequest struct {
Directory string `protobuf:"bytes,1,opt,name=directory" json:"directory,omitempty"`
}
func (m *ListEntriesRequest) Reset() { *m = ListEntriesRequest{} }
func (m *ListEntriesRequest) String() string { return proto.CompactTextString(m) }
func (*ListEntriesRequest) ProtoMessage() {}
func (*ListEntriesRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *ListEntriesRequest) GetDirectory() string {
if m != nil {
return m.Directory
}
return ""
}
type ListEntriesResponse struct {
Entries []*Entry `protobuf:"bytes,1,rep,name=entries" json:"entries,omitempty"`
}
func (m *ListEntriesResponse) Reset() { *m = ListEntriesResponse{} }
func (m *ListEntriesResponse) String() string { return proto.CompactTextString(m) }
func (*ListEntriesResponse) ProtoMessage() {}
func (*ListEntriesResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *ListEntriesResponse) GetEntries() []*Entry {
if m != nil {
return m.Entries
}
return nil
}
type Entry struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
IsDirectory bool `protobuf:"varint,2,opt,name=is_directory,json=isDirectory" json:"is_directory,omitempty"`
FileId string `protobuf:"bytes,3,opt,name=file_id,json=fileId" json:"file_id,omitempty"`
Attributes *FuseAttributes `protobuf:"bytes,4,opt,name=attributes" json:"attributes,omitempty"`
}
func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *Entry) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *Entry) GetIsDirectory() bool {
if m != nil {
return m.IsDirectory
}
return false
}
func (m *Entry) GetFileId() string {
if m != nil {
return m.FileId
}
return ""
}
func (m *Entry) GetAttributes() *FuseAttributes {
if m != nil {
return m.Attributes
}
return nil
}
type FuseAttributes struct {
FileSize uint64 `protobuf:"varint,1,opt,name=file_size,json=fileSize" json:"file_size,omitempty"`
Mtime int64 `protobuf:"varint,2,opt,name=mtime" json:"mtime,omitempty"`
FileMode uint32 `protobuf:"varint,3,opt,name=file_mode,json=fileMode" json:"file_mode,omitempty"`
Uid uint32 `protobuf:"varint,4,opt,name=uid" json:"uid,omitempty"`
Gid uint32 `protobuf:"varint,5,opt,name=gid" json:"gid,omitempty"`
}
func (m *FuseAttributes) Reset() { *m = FuseAttributes{} }
func (m *FuseAttributes) String() string { return proto.CompactTextString(m) }
func (*FuseAttributes) ProtoMessage() {}
func (*FuseAttributes) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *FuseAttributes) GetFileSize() uint64 {
if m != nil {
return m.FileSize
}
return 0
}
func (m *FuseAttributes) GetMtime() int64 {
if m != nil {
return m.Mtime
}
return 0
}
func (m *FuseAttributes) GetFileMode() uint32 {
if m != nil {
return m.FileMode
}
return 0
}
func (m *FuseAttributes) GetUid() uint32 {
if m != nil {
return m.Uid
}
return 0
}
func (m *FuseAttributes) GetGid() uint32 {
if m != nil {
return m.Gid
}
return 0
}
type GetFileAttributesRequest struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
ParentDir string `protobuf:"bytes,2,opt,name=parent_dir,json=parentDir" json:"parent_dir,omitempty"`
FileId string `protobuf:"bytes,3,opt,name=file_id,json=fileId" json:"file_id,omitempty"`
}
func (m *GetFileAttributesRequest) Reset() { *m = GetFileAttributesRequest{} }
func (m *GetFileAttributesRequest) String() string { return proto.CompactTextString(m) }
func (*GetFileAttributesRequest) ProtoMessage() {}
func (*GetFileAttributesRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *GetFileAttributesRequest) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *GetFileAttributesRequest) GetParentDir() string {
if m != nil {
return m.ParentDir
}
return ""
}
func (m *GetFileAttributesRequest) GetFileId() string {
if m != nil {
return m.FileId
}
return ""
}
type GetFileAttributesResponse struct {
Attributes *FuseAttributes `protobuf:"bytes,1,opt,name=attributes" json:"attributes,omitempty"`
}
func (m *GetFileAttributesResponse) Reset() { *m = GetFileAttributesResponse{} }
func (m *GetFileAttributesResponse) String() string { return proto.CompactTextString(m) }
func (*GetFileAttributesResponse) ProtoMessage() {}
func (*GetFileAttributesResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *GetFileAttributesResponse) GetAttributes() *FuseAttributes {
if m != nil {
return m.Attributes
}
return nil
}
type GetFileContentRequest struct {
FileId string `protobuf:"bytes,1,opt,name=file_id,json=fileId" json:"file_id,omitempty"`
}
func (m *GetFileContentRequest) Reset() { *m = GetFileContentRequest{} }
func (m *GetFileContentRequest) String() string { return proto.CompactTextString(m) }
func (*GetFileContentRequest) ProtoMessage() {}
func (*GetFileContentRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
func (m *GetFileContentRequest) GetFileId() string {
if m != nil {
return m.FileId
}
return ""
}
type GetFileContentResponse struct {
Content []byte `protobuf:"bytes,1,opt,name=content,proto3" json:"content,omitempty"`
}
func (m *GetFileContentResponse) Reset() { *m = GetFileContentResponse{} }
func (m *GetFileContentResponse) String() string { return proto.CompactTextString(m) }
func (*GetFileContentResponse) ProtoMessage() {}
func (*GetFileContentResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
func (m *GetFileContentResponse) GetContent() []byte {
if m != nil {
return m.Content
}
return nil
}
type DeleteEntryRequest struct {
Directory string `protobuf:"bytes,1,opt,name=directory" json:"directory,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
IsDirectory bool `protobuf:"varint,3,opt,name=is_directory,json=isDirectory" json:"is_directory,omitempty"`
}
func (m *DeleteEntryRequest) Reset() { *m = DeleteEntryRequest{} }
func (m *DeleteEntryRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteEntryRequest) ProtoMessage() {}
func (*DeleteEntryRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
func (m *DeleteEntryRequest) GetDirectory() string {
if m != nil {
return m.Directory
}
return ""
}
func (m *DeleteEntryRequest) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *DeleteEntryRequest) GetIsDirectory() bool {
if m != nil {
return m.IsDirectory
}
return false
}
type DeleteEntryResponse struct {
}
func (m *DeleteEntryResponse) Reset() { *m = DeleteEntryResponse{} }
func (m *DeleteEntryResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteEntryResponse) ProtoMessage() {}
func (*DeleteEntryResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} }
func init() {
proto.RegisterType((*LookupDirectoryEntryRequest)(nil), "filer.LookupDirectoryEntryRequest")
proto.RegisterType((*LookupDirectoryEntryResponse)(nil), "filer.LookupDirectoryEntryResponse")
proto.RegisterType((*ListEntriesRequest)(nil), "filer.ListEntriesRequest")
proto.RegisterType((*ListEntriesResponse)(nil), "filer.ListEntriesResponse")
proto.RegisterType((*Entry)(nil), "filer.Entry")
proto.RegisterType((*FuseAttributes)(nil), "filer.FuseAttributes")
proto.RegisterType((*GetFileAttributesRequest)(nil), "filer.GetFileAttributesRequest")
proto.RegisterType((*GetFileAttributesResponse)(nil), "filer.GetFileAttributesResponse")
proto.RegisterType((*GetFileContentRequest)(nil), "filer.GetFileContentRequest")
proto.RegisterType((*GetFileContentResponse)(nil), "filer.GetFileContentResponse")
proto.RegisterType((*DeleteEntryRequest)(nil), "filer.DeleteEntryRequest")
proto.RegisterType((*DeleteEntryResponse)(nil), "filer.DeleteEntryResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for SeaweedFiler service
type SeaweedFilerClient interface {
LookupDirectoryEntry(ctx context.Context, in *LookupDirectoryEntryRequest, opts ...grpc.CallOption) (*LookupDirectoryEntryResponse, error)
ListEntries(ctx context.Context, in *ListEntriesRequest, opts ...grpc.CallOption) (*ListEntriesResponse, error)
GetFileAttributes(ctx context.Context, in *GetFileAttributesRequest, opts ...grpc.CallOption) (*GetFileAttributesResponse, error)
GetFileContent(ctx context.Context, in *GetFileContentRequest, opts ...grpc.CallOption) (*GetFileContentResponse, error)
DeleteEntry(ctx context.Context, in *DeleteEntryRequest, opts ...grpc.CallOption) (*DeleteEntryResponse, error)
}
type seaweedFilerClient struct {
cc *grpc.ClientConn
}
func NewSeaweedFilerClient(cc *grpc.ClientConn) SeaweedFilerClient {
return &seaweedFilerClient{cc}
}
func (c *seaweedFilerClient) LookupDirectoryEntry(ctx context.Context, in *LookupDirectoryEntryRequest, opts ...grpc.CallOption) (*LookupDirectoryEntryResponse, error) {
out := new(LookupDirectoryEntryResponse)
err := grpc.Invoke(ctx, "/filer.SeaweedFiler/LookupDirectoryEntry", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedFilerClient) ListEntries(ctx context.Context, in *ListEntriesRequest, opts ...grpc.CallOption) (*ListEntriesResponse, error) {
out := new(ListEntriesResponse)
err := grpc.Invoke(ctx, "/filer.SeaweedFiler/ListEntries", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedFilerClient) GetFileAttributes(ctx context.Context, in *GetFileAttributesRequest, opts ...grpc.CallOption) (*GetFileAttributesResponse, error) {
out := new(GetFileAttributesResponse)
err := grpc.Invoke(ctx, "/filer.SeaweedFiler/GetFileAttributes", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedFilerClient) GetFileContent(ctx context.Context, in *GetFileContentRequest, opts ...grpc.CallOption) (*GetFileContentResponse, error) {
out := new(GetFileContentResponse)
err := grpc.Invoke(ctx, "/filer.SeaweedFiler/GetFileContent", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedFilerClient) DeleteEntry(ctx context.Context, in *DeleteEntryRequest, opts ...grpc.CallOption) (*DeleteEntryResponse, error) {
out := new(DeleteEntryResponse)
err := grpc.Invoke(ctx, "/filer.SeaweedFiler/DeleteEntry", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for SeaweedFiler service
type SeaweedFilerServer interface {
LookupDirectoryEntry(context.Context, *LookupDirectoryEntryRequest) (*LookupDirectoryEntryResponse, error)
ListEntries(context.Context, *ListEntriesRequest) (*ListEntriesResponse, error)
GetFileAttributes(context.Context, *GetFileAttributesRequest) (*GetFileAttributesResponse, error)
GetFileContent(context.Context, *GetFileContentRequest) (*GetFileContentResponse, error)
DeleteEntry(context.Context, *DeleteEntryRequest) (*DeleteEntryResponse, error)
}
func RegisterSeaweedFilerServer(s *grpc.Server, srv SeaweedFilerServer) {
s.RegisterService(&_SeaweedFiler_serviceDesc, srv)
}
func _SeaweedFiler_LookupDirectoryEntry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(LookupDirectoryEntryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedFilerServer).LookupDirectoryEntry(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/filer.SeaweedFiler/LookupDirectoryEntry",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedFilerServer).LookupDirectoryEntry(ctx, req.(*LookupDirectoryEntryRequest))
}
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_ListEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListEntriesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedFilerServer).ListEntries(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/filer.SeaweedFiler/ListEntries",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedFilerServer).ListEntries(ctx, req.(*ListEntriesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_GetFileAttributes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetFileAttributesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedFilerServer).GetFileAttributes(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/filer.SeaweedFiler/GetFileAttributes",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedFilerServer).GetFileAttributes(ctx, req.(*GetFileAttributesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_GetFileContent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetFileContentRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedFilerServer).GetFileContent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/filer.SeaweedFiler/GetFileContent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedFilerServer).GetFileContent(ctx, req.(*GetFileContentRequest))
}
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_DeleteEntry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteEntryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedFilerServer).DeleteEntry(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/filer.SeaweedFiler/DeleteEntry",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedFilerServer).DeleteEntry(ctx, req.(*DeleteEntryRequest))
}
return interceptor(ctx, in, info, handler)
}
var _SeaweedFiler_serviceDesc = grpc.ServiceDesc{
ServiceName: "filer.SeaweedFiler",
HandlerType: (*SeaweedFilerServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "LookupDirectoryEntry",
Handler: _SeaweedFiler_LookupDirectoryEntry_Handler,
},
{
MethodName: "ListEntries",
Handler: _SeaweedFiler_ListEntries_Handler,
},
{
MethodName: "GetFileAttributes",
Handler: _SeaweedFiler_GetFileAttributes_Handler,
},
{
MethodName: "GetFileContent",
Handler: _SeaweedFiler_GetFileContent_Handler,
},
{
MethodName: "DeleteEntry",
Handler: _SeaweedFiler_DeleteEntry_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "filer.proto",
}
func init() { proto.RegisterFile("filer.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 523 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x54, 0x4d, 0x6f, 0xd3, 0x40,
0x10, 0xad, 0x71, 0xdc, 0x34, 0x93, 0xb4, 0x82, 0x69, 0x03, 0xae, 0x9b, 0x8a, 0xb0, 0x48, 0x88,
0x53, 0x85, 0x82, 0x38, 0x72, 0x00, 0x42, 0x10, 0x52, 0x51, 0x25, 0xf7, 0xc2, 0xad, 0x72, 0xeb,
0x69, 0xb5, 0x22, 0xb1, 0x83, 0x77, 0x2d, 0xd4, 0x9e, 0x39, 0xf3, 0x4b, 0xf8, 0x91, 0x68, 0x3f,
0xec, 0xd8, 0xd8, 0xa9, 0x22, 0xf5, 0xe6, 0x7d, 0x33, 0x6f, 0xe6, 0xcd, 0x97, 0xa1, 0x7f, 0xcd,
0xe7, 0x94, 0x9d, 0x2c, 0xb3, 0x54, 0xa6, 0xe8, 0xe9, 0x07, 0x3b, 0x83, 0xa3, 0xd3, 0x34, 0xfd,
0x91, 0x2f, 0xa7, 0x3c, 0xa3, 0x2b, 0x99, 0x66, 0xb7, 0x9f, 0x13, 0x99, 0xdd, 0x86, 0xf4, 0x33,
0x27, 0x21, 0x71, 0x04, 0xbd, 0xb8, 0x30, 0xf8, 0xce, 0xd8, 0x79, 0xdd, 0x0b, 0x57, 0x00, 0x22,
0x74, 0x92, 0x68, 0x41, 0xfe, 0x23, 0x6d, 0xd0, 0xdf, 0xec, 0x23, 0x8c, 0xda, 0x03, 0x8a, 0x65,
0x9a, 0x08, 0x42, 0x06, 0x1e, 0x29, 0x40, 0x47, 0xeb, 0x4f, 0x06, 0x27, 0x46, 0x94, 0x71, 0x32,
0x26, 0x36, 0x01, 0x3c, 0xe5, 0x42, 0x2a, 0x8c, 0x93, 0xd8, 0x48, 0x0b, 0x7b, 0x0f, 0xfb, 0x35,
0x8e, 0x4d, 0xf7, 0x0a, 0xba, 0x64, 0x20, 0xdf, 0x19, 0xbb, 0x8d, 0x84, 0x85, 0x91, 0xfd, 0x71,
0xc0, 0xd3, 0x50, 0x59, 0x94, 0xb3, 0x2a, 0x0a, 0x5f, 0xc0, 0x80, 0x8b, 0x8b, 0x55, 0x76, 0x55,
0xf0, 0x4e, 0xd8, 0xe7, 0xa2, 0x2c, 0x12, 0x9f, 0x41, 0x57, 0x05, 0xbe, 0xe0, 0xb1, 0xef, 0x6a,
0xe6, 0xb6, 0x7a, 0x7e, 0x8d, 0xf1, 0x1d, 0x40, 0x24, 0x65, 0xc6, 0x2f, 0x73, 0x49, 0xc2, 0xef,
0xe8, 0xaa, 0x87, 0x56, 0xc4, 0x2c, 0x17, 0xf4, 0xa1, 0x34, 0x86, 0x15, 0x47, 0xf6, 0xdb, 0x81,
0xbd, 0xba, 0x19, 0x8f, 0xa0, 0xa7, 0x53, 0x08, 0x7e, 0x67, 0xe4, 0x75, 0xc2, 0x1d, 0x05, 0x9c,
0xf3, 0x3b, 0xc2, 0x03, 0xf0, 0x16, 0x92, 0xdb, 0x61, 0xb8, 0xa1, 0x79, 0x94, 0x94, 0x45, 0x1a,
0x93, 0xd6, 0xb5, 0x6b, 0x28, 0xdf, 0xd2, 0x98, 0xf0, 0x31, 0xb8, 0x39, 0x8f, 0xb5, 0xa4, 0xdd,
0x50, 0x7d, 0x2a, 0xe4, 0x86, 0xc7, 0xbe, 0x67, 0x90, 0x1b, 0x1e, 0xb3, 0x6b, 0xf0, 0xbf, 0x90,
0x9c, 0xf1, 0x79, 0x55, 0xa7, 0x1d, 0x48, 0x5b, 0xa7, 0x8e, 0x01, 0x96, 0x51, 0x46, 0x89, 0x54,
0xdd, 0xb2, 0x8b, 0xd1, 0x33, 0xc8, 0x94, 0x67, 0x6b, 0xbb, 0xc4, 0x42, 0x38, 0x6c, 0xc9, 0x63,
0x87, 0x58, 0x6f, 0xa1, 0xb3, 0x69, 0x0b, 0xdf, 0xc0, 0xd0, 0xc6, 0xfc, 0x94, 0x26, 0x92, 0x12,
0x59, 0x08, 0xaf, 0xa8, 0x70, 0x6a, 0x2a, 0x26, 0xf0, 0xf4, 0x7f, 0x86, 0x95, 0xe0, 0x43, 0xf7,
0xca, 0x40, 0x9a, 0x32, 0x08, 0x8b, 0x27, 0xe3, 0x80, 0x53, 0x9a, 0x93, 0xa4, 0x87, 0x1d, 0x4e,
0x63, 0xc7, 0xdc, 0xc6, 0x8e, 0xb1, 0x21, 0xec, 0xd7, 0x52, 0x19, 0x6d, 0x93, 0xbf, 0x2e, 0x0c,
0xce, 0x29, 0xfa, 0x45, 0x14, 0x2b, 0xe9, 0x19, 0x46, 0x70, 0xd0, 0x76, 0x83, 0xc8, 0x6c, 0xcf,
0xee, 0xb9, 0xf8, 0xe0, 0xe5, 0xbd, 0x3e, 0x26, 0x23, 0xdb, 0xc2, 0x19, 0xf4, 0x2b, 0xe7, 0x86,
0x87, 0x05, 0xab, 0x71, 0xb6, 0x41, 0xd0, 0x66, 0x2a, 0xe3, 0x7c, 0x87, 0x27, 0x8d, 0xb9, 0xe3,
0x73, 0x4b, 0x59, 0xb7, 0x79, 0xc1, 0x78, 0xbd, 0x43, 0x19, 0xf9, 0x0c, 0xf6, 0xea, 0xb3, 0xc4,
0x51, 0x9d, 0x55, 0x5f, 0x8a, 0xe0, 0x78, 0x8d, 0xb5, 0x5a, 0x72, 0xa5, 0xfb, 0x65, 0xc9, 0xcd,
0xe1, 0x97, 0x25, 0xb7, 0x0c, 0x8b, 0x6d, 0x5d, 0x6e, 0xeb, 0x1f, 0xf0, 0xdb, 0x7f, 0x01, 0x00,
0x00, 0xff, 0xff, 0x6f, 0xb3, 0x14, 0x9b, 0x8f, 0x05, 0x00, 0x00,
}

100
weed/filesys/dir.go

@ -10,6 +10,7 @@ import (
"bazil.org/fuse"
"github.com/chrislusf/seaweedfs/weed/filer"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
)
type Dir struct {
@ -20,10 +21,22 @@ type Dir struct {
}
func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error {
attr.Mode = os.ModeDir | 0555
attr.Mode = os.ModeDir | 0777
return nil
}
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
dir.NodeMapLock.Lock()
defer dir.NodeMapLock.Unlock()
fmt.Printf("mkdir %+v\n", req)
node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs}
dir.NodeMap[req.Name] = node
return node, nil
}
func (dir *Dir) Lookup(ctx context.Context, name string) (node fs.Node, err error) {
dir.NodeMapLock.Lock()
@ -37,37 +50,66 @@ func (dir *Dir) Lookup(ctx context.Context, name string) (node fs.Node, err erro
return node, nil
}
if entry, err := filer.LookupDirectoryEntry(dir.wfs.filer, dir.Path, name); err == nil {
if !entry.Found {
return nil, fuse.ENOENT
var entry *filer.Entry
err = dir.wfs.withFilerClient(func(client filer.SeaweedFilerClient) error {
request := &filer.LookupDirectoryEntryRequest{
Directory: dir.Path,
Name: name,
}
if entry.FileId != "" {
node = &File{FileId: filer.FileId(entry.FileId), Name: name, wfs: dir.wfs}
} else {
glog.V(1).Infof("lookup directory entry: %v", request)
resp, err := client.LookupDirectoryEntry(ctx, request)
if err != nil {
return err
}
entry = resp.Entry
return nil
})
if entry != nil {
if entry.IsDirectory {
node = &Dir{Path: path.Join(dir.Path, name), wfs: dir.wfs}
} else {
node = &File{FileId: filer.FileId(entry.FileId), Name: name, wfs: dir.wfs}
}
dir.NodeMap[name] = node
return node, nil
}
return nil, fuse.ENOENT
return nil, err
}
func (dir *Dir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
var ret []fuse.Dirent
if dirs, e := filer.ListDirectories(dir.wfs.filer, dir.Path); e == nil {
for _, d := range dirs.Directories {
dirent := fuse.Dirent{Name: string(d), Type: fuse.DT_Dir}
ret = append(ret, dirent)
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
err = dir.wfs.withFilerClient(func(client filer.SeaweedFilerClient) error {
request := &filer.ListEntriesRequest{
Directory: dir.Path,
}
glog.V(1).Infof("read directory: %v", request)
resp, err := client.ListEntries(ctx, request)
if err != nil {
return err
}
if files, e := filer.ListFiles(dir.wfs.filer, dir.Path, ""); e == nil {
for _, f := range files.Files {
dirent := fuse.Dirent{Name: f.Name, Type: fuse.DT_File}
for _, entry := range resp.Entries {
if entry.IsDirectory {
dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir}
ret = append(ret, dirent)
} else {
dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File}
ret = append(ret, dirent)
}
}
return ret, nil
return nil
})
return ret, err
}
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
@ -75,13 +117,23 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
dir.NodeMapLock.Lock()
defer dir.NodeMapLock.Unlock()
name := path.Join(dir.Path, req.Name)
err := filer.DeleteDirectoryOrFile(dir.wfs.filer, name, req.Dir)
if err != nil {
fmt.Printf("Delete file %s [ERROR] %s\n", name, err)
} else {
delete(dir.NodeMap, req.Name)
return dir.wfs.withFilerClient(func(client filer.SeaweedFilerClient) error {
request := &filer.DeleteEntryRequest{
Directory: dir.Path,
Name: req.Name,
IsDirectory: req.Dir,
}
glog.V(1).Infof("remove directory entry: %v", request)
_, err := client.DeleteEntry(ctx, request)
if err != nil {
return err
}
delete(dir.NodeMap, req.Name)
return nil
})
}

61
weed/filesys/file.go

@ -6,8 +6,18 @@ import (
"bazil.org/fuse"
"github.com/chrislusf/seaweedfs/weed/filer"
"bazil.org/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/glog"
)
var _ = fs.Node(&File{})
// var _ = fs.NodeOpener(&File{})
// var _ = fs.NodeFsyncer(&File{})
var _ = fs.Handle(&File{})
var _ = fs.HandleReadAller(&File{})
// var _ = fs.HandleReader(&File{})
var _ = fs.HandleWriter(&File{})
type File struct {
FileId filer.FileId
Name string
@ -16,20 +26,49 @@ type File struct {
func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
attr.Mode = 0444
ret, err := filer.GetFileSize(file.wfs.filer, string(file.FileId))
if err == nil {
attr.Size = ret.Size
} else {
fmt.Printf("Get file %s attr [ERROR] %s\n", file.Name, err)
return file.wfs.withFilerClient(func(client filer.SeaweedFilerClient) error {
request := &filer.GetFileAttributesRequest{
Name: file.Name,
ParentDir: "", //TODO add parent folder
FileId: string(file.FileId),
}
glog.V(1).Infof("read file size: %v", request)
resp, err := client.GetFileAttributes(context, request)
if err != nil {
return err
}
attr.Size = resp.Attributes.FileSize
return nil
})
}
func (file *File) ReadAll(ctx context.Context) (content []byte, err error) {
err = file.wfs.withFilerClient(func(client filer.SeaweedFilerClient) error {
request := &filer.GetFileContentRequest{
FileId: string(file.FileId),
}
glog.V(1).Infof("read file content: %v", request)
resp, err := client.GetFileContent(ctx, request)
if err != nil {
return err
}
func (file *File) ReadAll(ctx context.Context) ([]byte, error) {
ret, err := filer.GetFileContent(file.wfs.filer, string(file.FileId))
if err == nil {
return ret.Content, nil
content = resp.Content
return nil
})
return content, err
}
fmt.Printf("Get file %s content [ERROR] %s\n", file.Name, err)
return nil, err
func (file *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
fmt.Printf("write file %+v\n", req)
return nil
}

20
weed/filesys/wfs.go

@ -1,6 +1,11 @@
package filesys
import "bazil.org/fuse/fs"
import (
"bazil.org/fuse/fs"
"fmt"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filer"
)
type WFS struct {
filer string
@ -15,3 +20,16 @@ func NewSeaweedFileSystem(filer string) *WFS {
func (wfs *WFS) Root() (fs.Node, error) {
return &Dir{Path: "/", wfs: wfs}, nil
}
func (wfs *WFS) withFilerClient(fn func(filer.SeaweedFilerClient) error) error {
grpcConnection, err := grpc.Dial(wfs.filer, grpc.WithInsecure())
if err != nil {
return fmt.Errorf("fail to dial %s: %v", wfs.filer, err)
}
defer grpcConnection.Close()
client := filer.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}

1
weed/pb/Makefile

@ -4,3 +4,4 @@ all: gen
gen:
protoc seaweed.proto --go_out=plugins=grpc:.
protoc filer.proto --go_out=plugins=grpc:../filer

85
weed/pb/filer.proto

@ -0,0 +1,85 @@
syntax = "proto3";
package filer;
//////////////////////////////////////////////////
service SeaweedFiler {
rpc LookupDirectoryEntry (LookupDirectoryEntryRequest) returns (LookupDirectoryEntryResponse) {
}
rpc ListEntries (ListEntriesRequest) returns (ListEntriesResponse) {
}
rpc GetFileAttributes (GetFileAttributesRequest) returns (GetFileAttributesResponse) {
}
rpc GetFileContent (GetFileContentRequest) returns (GetFileContentResponse) {
}
rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) {
}
}
//////////////////////////////////////////////////
message LookupDirectoryEntryRequest {
string directory = 1;
string name = 2;
}
message LookupDirectoryEntryResponse {
Entry entry = 1;
}
message ListEntriesRequest {
string directory = 1;
}
message ListEntriesResponse {
repeated Entry entries = 1;
}
message Entry {
string name = 1;
bool is_directory = 2;
string file_id = 3;
FuseAttributes attributes = 4;
}
message FuseAttributes {
uint64 file_size = 1;
int64 mtime = 2;
uint32 file_mode = 3;
uint32 uid = 4;
uint32 gid = 5;
}
message GetFileAttributesRequest {
string name = 1;
string parent_dir = 2;
string file_id = 3;
}
message GetFileAttributesResponse {
FuseAttributes attributes = 1;
}
message GetFileContentRequest {
string file_id = 1;
}
message GetFileContentResponse {
bytes content = 1;
}
message DeleteEntryRequest {
string directory = 1;
string name = 2;
bool is_directory = 3;
}
message DeleteEntryResponse {
}

108
weed/server/filer_grpc_server.go

@ -0,0 +1,108 @@
package weed_server
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"bazil.org/fuse"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/util"
"strconv"
)
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer.LookupDirectoryEntryRequest) (*filer.LookupDirectoryEntryResponse, error) {
found, fileId, err := fs.filer.LookupDirectoryEntry(req.Directory, req.Name)
if err != nil {
return nil, err
}
if !found {
return nil, fuse.ENOENT
}
return &filer.LookupDirectoryEntryResponse{
Entry: &filer.Entry{
Name: req.Name,
IsDirectory: fileId == "",
FileId: fileId,
},
}, nil
}
func (fs *FilerServer) ListEntries(ctx context.Context, req *filer.ListEntriesRequest) (*filer.ListEntriesResponse, error) {
directoryNames, err := fs.filer.ListDirectories(req.Directory)
if err != nil {
return nil, err
}
files, err := fs.filer.ListFiles(req.Directory, "", 1000)
if err != nil {
return nil, err
}
resp := &filer.ListEntriesResponse{}
for _, dir := range directoryNames {
resp.Entries = append(resp.Entries, &filer.Entry{
Name: string(dir),
IsDirectory: true,
})
}
for _, fileEntry := range files {
resp.Entries = append(resp.Entries, &filer.Entry{
Name: fileEntry.Name,
IsDirectory: false,
FileId: string(fileEntry.Id),
})
}
return resp, nil
}
func (fs *FilerServer) GetFileAttributes(ctx context.Context, req *filer.GetFileAttributesRequest) (*filer.GetFileAttributesResponse, error) {
attributes := &filer.FuseAttributes{}
server, err := operation.LookupFileId(fs.getMasterNode(), req.FileId)
if err != nil {
return nil, err
}
head, err := util.Head(server)
if err != nil {
return nil, err
}
attributes.FileSize, err = strconv.ParseUint(head.Get("Content-Length"), 10, 0)
if err != nil {
return nil, err
}
return &filer.GetFileAttributesResponse{
Attributes: attributes,
}, nil
}
func (fs *FilerServer) GetFileContent(ctx context.Context, req *filer.GetFileContentRequest) (*filer.GetFileContentResponse, error) {
server, err := operation.LookupFileId(fs.getMasterNode(), req.FileId)
if err != nil {
return nil, err
}
content, err := util.Get(server)
if err != nil {
return nil, err
}
return &filer.GetFileContentResponse{
Content: content,
}, nil
}
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer.DeleteEntryRequest) (resp *filer.DeleteEntryResponse, err error) {
if req.IsDirectory {
err = fs.filer.DeleteDirectory(req.Directory+req.Name, false)
} else {
fid, err := fs.filer.DeleteFile(req.Directory + req.Name)
if err == nil && fid != "" {
err = operation.DeleteFile(fs.getMasterNode(), fid, fs.jwt(fid))
}
}
return nil, err
}

1
weed/server/filer_server.go

@ -106,7 +106,6 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int,
}
defaultMux.HandleFunc("/admin/mv", fs.moveHandler)
defaultMux.HandleFunc("/__api__", fs.apiHandler)
}
defaultMux.HandleFunc("/admin/register", fs.registerHandler)

83
weed/server/filer_server_handlers_api.go

@ -1,83 +0,0 @@
package weed_server
import (
"encoding/json"
"net/http"
"strconv"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) apiHandler(w http.ResponseWriter, r *http.Request) {
request := r.FormValue("request")
apiRequest := filer.ApiRequest{}
err := json.Unmarshal([]byte(request), &apiRequest)
if err != nil {
glog.V(0).Infoln("failing to read request", r.RequestURI, request)
writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infof("api request: %+v", apiRequest)
switch apiRequest.Command {
case "lookupDirectoryEntry":
res := filer.LookupDirectoryEntryResult{}
res.Found, res.FileId, err = fs.filer.LookupDirectoryEntry(apiRequest.Directory, apiRequest.FileName)
if err != nil {
res.Error = err.Error()
}
writeJsonQuiet(w, r, http.StatusOK, res)
case "listDirectories":
res := filer.ListDirectoriesResult{}
res.Directories, err = fs.filer.ListDirectories(apiRequest.Directory)
if err != nil {
res.Error = err.Error()
}
writeJsonQuiet(w, r, http.StatusOK, res)
case "listFiles":
res := filer.ListFilesResult{}
res.Files, err = fs.filer.ListFiles(apiRequest.Directory, apiRequest.FileName, 1000)
if err != nil {
res.Error = err.Error()
}
writeJsonQuiet(w, r, http.StatusOK, res)
case "getFileSize":
res := filer.GetFileSizeResult{}
server, err := operation.LookupFileId(fs.getMasterNode(), apiRequest.FileId)
if err != nil {
res.Error = err.Error()
writeJsonQuiet(w, r, http.StatusOK, res)
return
}
head, err := util.Head(server)
if err != nil {
res.Error = err.Error()
writeJsonQuiet(w, r, http.StatusOK, res)
return
}
res.Size, err = strconv.ParseUint(head.Get("Content-Length"), 10, 0)
if err != nil {
res.Error = err.Error()
}
writeJsonQuiet(w, r, http.StatusOK, res)
case "getFileContent":
res := filer.GetFileContentResult{}
server, err := operation.LookupFileId(fs.getMasterNode(), apiRequest.FileId)
if err != nil {
res.Error = err.Error()
writeJsonQuiet(w, r, http.StatusOK, res)
return
}
res.Content, err = util.Get(server)
if err != nil {
res.Error = err.Error()
writeJsonQuiet(w, r, http.StatusOK, res)
return
}
writeJsonQuiet(w, r, http.StatusOK, res)
}
}
Loading…
Cancel
Save