chrislu
3 years ago
11 changed files with 7 additions and 295 deletions
-
4.github/workflows/binaries_release4.yml
-
2.github/workflows/container_release4.yml
-
2.github/workflows/container_release5.yml
-
4.github/workflows/go.yml
-
2Makefile
-
1weed/command/imports.go
-
5weed/pb/remote.proto
-
9weed/remote_storage/hdfs/doc.go
-
58weed/remote_storage/hdfs/hdfs_kerberos.go
-
194weed/remote_storage/hdfs/hdfs_storage_client.go
-
21weed/shell/command_remote_configure.go
@ -1,9 +0,0 @@ |
|||||
/* |
|
||||
|
|
||||
Package hdfs is for remote hdfs storage. |
|
||||
|
|
||||
The referenced "github.com/colinmarc/hdfs/v2" library is too big when compiled. |
|
||||
So this is only compiled in "make full_install". |
|
||||
|
|
||||
*/ |
|
||||
package hdfs |
|
@ -1,58 +0,0 @@ |
|||||
//go:build hdfs
|
|
||||
// +build hdfs
|
|
||||
|
|
||||
package hdfs |
|
||||
|
|
||||
import ( |
|
||||
"fmt" |
|
||||
"os" |
|
||||
"os/user" |
|
||||
"strings" |
|
||||
|
|
||||
krb "github.com/jcmturner/gokrb5/v8/client" |
|
||||
"github.com/jcmturner/gokrb5/v8/config" |
|
||||
"github.com/jcmturner/gokrb5/v8/credentials" |
|
||||
) |
|
||||
|
|
||||
// copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go
|
|
||||
func getKerberosClient() (*krb.Client, error) { |
|
||||
configPath := os.Getenv("KRB5_CONFIG") |
|
||||
if configPath == "" { |
|
||||
configPath = "/etc/krb5.conf" |
|
||||
} |
|
||||
|
|
||||
cfg, err := config.Load(configPath) |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
|
|
||||
// Determine the ccache location from the environment, falling back to the
|
|
||||
// default location.
|
|
||||
ccachePath := os.Getenv("KRB5CCNAME") |
|
||||
if strings.Contains(ccachePath, ":") { |
|
||||
if strings.HasPrefix(ccachePath, "FILE:") { |
|
||||
ccachePath = strings.SplitN(ccachePath, ":", 2)[1] |
|
||||
} else { |
|
||||
return nil, fmt.Errorf("unusable ccache: %s", ccachePath) |
|
||||
} |
|
||||
} else if ccachePath == "" { |
|
||||
u, err := user.Current() |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
|
|
||||
ccachePath = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid) |
|
||||
} |
|
||||
|
|
||||
ccache, err := credentials.LoadCCache(ccachePath) |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
|
|
||||
client, err := krb.NewFromCCache(ccache, cfg) |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
|
|
||||
return client, nil |
|
||||
} |
|
@ -1,194 +0,0 @@ |
|||||
//go:build hdfs
|
|
||||
// +build hdfs
|
|
||||
|
|
||||
package hdfs |
|
||||
|
|
||||
import ( |
|
||||
"fmt" |
|
||||
"github.com/chrislusf/seaweedfs/weed/glog" |
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
|
||||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|
||||
"github.com/chrislusf/seaweedfs/weed/util" |
|
||||
hdfs "github.com/colinmarc/hdfs/v2" |
|
||||
"io" |
|
||||
"os" |
|
||||
"path" |
|
||||
) |
|
||||
|
|
||||
func init() { |
|
||||
remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker) |
|
||||
} |
|
||||
|
|
||||
type hdfsRemoteStorageMaker struct{} |
|
||||
|
|
||||
func (s hdfsRemoteStorageMaker) HasBucket() bool { |
|
||||
return false |
|
||||
} |
|
||||
|
|
||||
func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { |
|
||||
client := &hdfsRemoteStorageClient{ |
|
||||
conf: conf, |
|
||||
} |
|
||||
|
|
||||
options := hdfs.ClientOptions{ |
|
||||
Addresses: conf.HdfsNamenodes, |
|
||||
UseDatanodeHostname: false, |
|
||||
} |
|
||||
|
|
||||
if conf.HdfsServicePrincipalName != "" { |
|
||||
var err error |
|
||||
options.KerberosClient, err = getKerberosClient() |
|
||||
if err != nil { |
|
||||
return nil, fmt.Errorf("get kerberos authentication: %s", err) |
|
||||
} |
|
||||
options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName |
|
||||
|
|
||||
if conf.HdfsDataTransferProtection != "" { |
|
||||
options.DataTransferProtection = conf.HdfsDataTransferProtection |
|
||||
} |
|
||||
} else { |
|
||||
options.User = conf.HdfsUsername |
|
||||
} |
|
||||
|
|
||||
c, err := hdfs.NewClient(options) |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
|
|
||||
client.client = c |
|
||||
return client, nil |
|
||||
} |
|
||||
|
|
||||
type hdfsRemoteStorageClient struct { |
|
||||
conf *remote_pb.RemoteConf |
|
||||
client *hdfs.Client |
|
||||
} |
|
||||
|
|
||||
var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{}) |
|
||||
|
|
||||
func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { |
|
||||
|
|
||||
return remote_storage.TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error { |
|
||||
children, err := c.client.ReadDir(string(parentDir)) |
|
||||
if err != nil { |
|
||||
return err |
|
||||
} |
|
||||
for _, child := range children { |
|
||||
if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{ |
|
||||
StorageName: c.conf.Name, |
|
||||
LastLocalSyncTsNs: 0, |
|
||||
RemoteETag: "", |
|
||||
RemoteMtime: child.ModTime().Unix(), |
|
||||
RemoteSize: child.Size(), |
|
||||
}); err != nil { |
|
||||
return nil |
|
||||
} |
|
||||
} |
|
||||
return nil |
|
||||
}, util.FullPath(loc.Path), visitFn) |
|
||||
|
|
||||
} |
|
||||
func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { |
|
||||
|
|
||||
f, err := c.client.Open(loc.Path) |
|
||||
if err != nil { |
|
||||
return |
|
||||
} |
|
||||
defer f.Close() |
|
||||
data = make([]byte, size) |
|
||||
_, err = f.ReadAt(data, offset) |
|
||||
|
|
||||
return |
|
||||
|
|
||||
} |
|
||||
|
|
||||
func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { |
|
||||
return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode)) |
|
||||
} |
|
||||
|
|
||||
func (c *hdfsRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) { |
|
||||
return c.client.RemoveAll(loc.Path) |
|
||||
} |
|
||||
|
|
||||
func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { |
|
||||
|
|
||||
dirname := path.Dir(loc.Path) |
|
||||
|
|
||||
// ensure parent directory
|
|
||||
if err = c.client.MkdirAll(dirname, 0755); err != nil { |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
// remove existing file
|
|
||||
info, err := c.client.Stat(loc.Path) |
|
||||
if err == nil { |
|
||||
err = c.client.Remove(loc.Path) |
|
||||
if err != nil { |
|
||||
return |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
// create new file
|
|
||||
out, err := c.client.Create(loc.Path) |
|
||||
if err != nil { |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
cleanup := func() { |
|
||||
if removeErr := c.client.Remove(loc.Path); removeErr != nil { |
|
||||
glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr) |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
if _, err = io.Copy(out, reader); err != nil { |
|
||||
cleanup() |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
if err = out.Close(); err != nil { |
|
||||
cleanup() |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
info, err = c.client.Stat(loc.Path) |
|
||||
if err != nil { |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
return &filer_pb.RemoteEntry{ |
|
||||
RemoteMtime: info.ModTime().Unix(), |
|
||||
RemoteSize: info.Size(), |
|
||||
RemoteETag: "", |
|
||||
StorageName: c.conf.Name, |
|
||||
}, nil |
|
||||
|
|
||||
} |
|
||||
|
|
||||
func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error { |
|
||||
if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode { |
|
||||
if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil { |
|
||||
return err |
|
||||
} |
|
||||
} |
|
||||
return nil |
|
||||
} |
|
||||
|
|
||||
func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) { |
|
||||
if err = c.client.Remove(loc.Path); err != nil { |
|
||||
return fmt.Errorf("hdfs delete %s: %v", loc.Path, err) |
|
||||
} |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
func (c *hdfsRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) { |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
func (c *hdfsRemoteStorageClient) CreateBucket(name string) (err error) { |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
func (c *hdfsRemoteStorageClient) DeleteBucket(name string) (err error) { |
|
||||
return |
|
||||
} |
|
Write
Preview
Loading…
Cancel
Save
Reference in new issue