Chris Lu
3 years ago
19 changed files with 539 additions and 65 deletions
-
11go.mod
-
22go.sum
-
6weed/pb/remote.proto
-
153weed/pb/remote_pb/remote.pb.go
-
4weed/remote_storage/azure/azure_storage_client.go
-
4weed/remote_storage/gcs/gcs_storage_client.go
-
55weed/remote_storage/hdfs/hdfs_kerberos.go
-
174weed/remote_storage/hdfs/hdfs_storage_client.go
-
63weed/remote_storage/hdfs/traverse_bfs.go
-
42weed/remote_storage/remote_storage.go
-
4weed/remote_storage/s3/aliyun.go
-
4weed/remote_storage/s3/backblaze.go
-
4weed/remote_storage/s3/baidu.go
-
4weed/remote_storage/s3/s3_storage_client.go
-
4weed/remote_storage/s3/tencent.go
-
4weed/remote_storage/s3/wasabi.go
-
30weed/shell/command_remote_configure.go
-
6weed/shell/command_remote_meta_sync.go
-
10weed/shell/command_remote_mount.go
@ -0,0 +1,55 @@ |
|||||
|
package hdfs |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"os" |
||||
|
"os/user" |
||||
|
"strings" |
||||
|
|
||||
|
krb "github.com/jcmturner/gokrb5/v8/client" |
||||
|
"github.com/jcmturner/gokrb5/v8/config" |
||||
|
"github.com/jcmturner/gokrb5/v8/credentials" |
||||
|
) |
||||
|
|
||||
|
// copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go
|
||||
|
func getKerberosClient() (*krb.Client, error) { |
||||
|
configPath := os.Getenv("KRB5_CONFIG") |
||||
|
if configPath == "" { |
||||
|
configPath = "/etc/krb5.conf" |
||||
|
} |
||||
|
|
||||
|
cfg, err := config.Load(configPath) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
// Determine the ccache location from the environment, falling back to the
|
||||
|
// default location.
|
||||
|
ccachePath := os.Getenv("KRB5CCNAME") |
||||
|
if strings.Contains(ccachePath, ":") { |
||||
|
if strings.HasPrefix(ccachePath, "FILE:") { |
||||
|
ccachePath = strings.SplitN(ccachePath, ":", 2)[1] |
||||
|
} else { |
||||
|
return nil, fmt.Errorf("unusable ccache: %s", ccachePath) |
||||
|
} |
||||
|
} else if ccachePath == "" { |
||||
|
u, err := user.Current() |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
ccachePath = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid) |
||||
|
} |
||||
|
|
||||
|
ccache, err := credentials.LoadCCache(ccachePath) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
client, err := krb.NewFromCCache(ccache, cfg) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
return client, nil |
||||
|
} |
@ -0,0 +1,174 @@ |
|||||
|
package hdfs |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
"github.com/colinmarc/hdfs/v2" |
||||
|
"io" |
||||
|
"os" |
||||
|
"path" |
||||
|
) |
||||
|
|
||||
|
func init() { |
||||
|
remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker) |
||||
|
} |
||||
|
|
||||
|
type hdfsRemoteStorageMaker struct{} |
||||
|
|
||||
|
func (s hdfsRemoteStorageMaker) HasBucket() bool { |
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { |
||||
|
client := &hdfsRemoteStorageClient{ |
||||
|
conf: conf, |
||||
|
} |
||||
|
|
||||
|
options := hdfs.ClientOptions{ |
||||
|
Addresses: conf.HdfsNamenodes, |
||||
|
UseDatanodeHostname: false, |
||||
|
} |
||||
|
|
||||
|
if conf.HdfsServicePrincipalName != "" { |
||||
|
var err error |
||||
|
options.KerberosClient, err = getKerberosClient() |
||||
|
if err != nil { |
||||
|
return nil, fmt.Errorf("get kerberos authentication: %s", err) |
||||
|
} |
||||
|
options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName |
||||
|
|
||||
|
if conf.HdfsDataTransferProtection != "" { |
||||
|
options.DataTransferProtection = conf.HdfsDataTransferProtection |
||||
|
} |
||||
|
} else { |
||||
|
options.User = conf.HdfsUsername |
||||
|
} |
||||
|
|
||||
|
c, err := hdfs.NewClient(options) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
client.client = c |
||||
|
return client, nil |
||||
|
} |
||||
|
|
||||
|
type hdfsRemoteStorageClient struct { |
||||
|
conf *remote_pb.RemoteConf |
||||
|
client *hdfs.Client |
||||
|
} |
||||
|
|
||||
|
var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{}) |
||||
|
|
||||
|
func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { |
||||
|
|
||||
|
return TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error { |
||||
|
children, err := c.client.ReadDir(string(parentDir)) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
for _, child := range children { |
||||
|
if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{ |
||||
|
StorageName: c.conf.Name, |
||||
|
LastLocalSyncTsNs: 0, |
||||
|
RemoteETag: "", |
||||
|
RemoteMtime: child.ModTime().Unix(), |
||||
|
RemoteSize: child.Size(), |
||||
|
}); err != nil { |
||||
|
return nil |
||||
|
} |
||||
|
} |
||||
|
return nil |
||||
|
}, util.FullPath(loc.Path), visitFn) |
||||
|
|
||||
|
} |
||||
|
func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { |
||||
|
|
||||
|
f, err := c.client.Open(loc.Path) |
||||
|
if err != nil { |
||||
|
return |
||||
|
} |
||||
|
defer f.Close() |
||||
|
data = make([]byte, size) |
||||
|
_, err = f.ReadAt(data, offset) |
||||
|
|
||||
|
return |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { |
||||
|
return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode)) |
||||
|
} |
||||
|
|
||||
|
func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { |
||||
|
|
||||
|
dirname := path.Dir(loc.Path) |
||||
|
|
||||
|
// ensure parent directory
|
||||
|
if err = c.client.MkdirAll(dirname, 0755); err != nil { |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
// remove existing file
|
||||
|
info, err := c.client.Stat(loc.Path) |
||||
|
if err == nil { |
||||
|
err = c.client.Remove(loc.Path) |
||||
|
if err != nil { |
||||
|
return |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// create new file
|
||||
|
out, err := c.client.Create(loc.Path) |
||||
|
if err != nil { |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
cleanup := func() { |
||||
|
if removeErr := c.client.Remove(loc.Path); removeErr != nil { |
||||
|
glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if _, err = io.Copy(out, reader); err != nil { |
||||
|
cleanup() |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
if err = out.Close(); err != nil { |
||||
|
cleanup() |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
info, err = c.client.Stat(loc.Path) |
||||
|
if err != nil { |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
return &filer_pb.RemoteEntry{ |
||||
|
RemoteMtime: info.ModTime().Unix(), |
||||
|
RemoteSize: info.Size(), |
||||
|
RemoteETag: "", |
||||
|
StorageName: c.conf.Name, |
||||
|
}, nil |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error { |
||||
|
if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode { |
||||
|
if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) { |
||||
|
if err = c.client.Remove(loc.Path); err != nil { |
||||
|
return fmt.Errorf("hdfs delete %s: %v", loc.Path, err) |
||||
|
} |
||||
|
return |
||||
|
} |
@ -0,0 +1,63 @@ |
|||||
|
package hdfs |
||||
|
|
||||
|
import ( |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
"sync" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type ListDirectoryFunc func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error |
||||
|
|
||||
|
func TraverseBfs(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn remote_storage.VisitFunc) (err error) { |
||||
|
K := 5 |
||||
|
|
||||
|
var dirQueueWg sync.WaitGroup |
||||
|
dirQueue := util.NewQueue() |
||||
|
dirQueueWg.Add(1) |
||||
|
dirQueue.Enqueue(parentPath) |
||||
|
var isTerminating bool |
||||
|
|
||||
|
for i := 0; i < K; i++ { |
||||
|
go func() { |
||||
|
for { |
||||
|
if isTerminating { |
||||
|
break |
||||
|
} |
||||
|
t := dirQueue.Dequeue() |
||||
|
if t == nil { |
||||
|
time.Sleep(329 * time.Millisecond) |
||||
|
continue |
||||
|
} |
||||
|
dir := t.(util.FullPath) |
||||
|
processErr := processOneDirectory(listDirFn, dir, visitFn, dirQueue, &dirQueueWg) |
||||
|
if processErr != nil { |
||||
|
err = processErr |
||||
|
} |
||||
|
dirQueueWg.Done() |
||||
|
} |
||||
|
}() |
||||
|
} |
||||
|
|
||||
|
dirQueueWg.Wait() |
||||
|
isTerminating = true |
||||
|
return |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func processOneDirectory(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn remote_storage.VisitFunc, dirQueue *util.Queue, dirQueueWg *sync.WaitGroup) (error) { |
||||
|
|
||||
|
return listDirFn(parentPath, func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { |
||||
|
if err := visitFn(dir, name, isDirectory, remoteEntry); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
if !isDirectory { |
||||
|
return nil |
||||
|
} |
||||
|
dirQueueWg.Add(1) |
||||
|
dirQueue.Enqueue(parentPath.Child(name)) |
||||
|
return nil |
||||
|
}) |
||||
|
|
||||
|
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue