Chris Lu
3 years ago
7 changed files with 500 additions and 142 deletions
-
2other/java/client/src/main/proto/filer.proto
-
1weed/command/imports.go
-
2weed/pb/filer.proto
-
302weed/pb/filer_pb/filer.pb.go
-
120weed/remote_storage/azure/azure_highlevel.go
-
207weed/remote_storage/azure/azure_storage_client.go
-
8weed/shell/command_remote_configure.go
@ -0,0 +1,120 @@ |
|||
package azure |
|||
|
|||
import ( |
|||
"context" |
|||
"crypto/rand" |
|||
"encoding/base64" |
|||
"errors" |
|||
"fmt" |
|||
"github.com/Azure/azure-pipeline-go/pipeline" |
|||
. "github.com/Azure/azure-storage-blob-go/azblob" |
|||
"io" |
|||
"sync" |
|||
) |
|||
|
|||
// copied from https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/highlevel.go#L73:6
|
|||
// uploadReaderAtToBlockBlob was not public
|
|||
|
|||
// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob.
|
|||
func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64, |
|||
blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { |
|||
if o.BlockSize == 0 { |
|||
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
|
|||
if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks { |
|||
return nil, errors.New("buffer is too large to upload to a block blob") |
|||
} |
|||
// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
|
|||
if readerSize <= BlockBlobMaxUploadBlobBytes { |
|||
o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
|
|||
} else { |
|||
o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
|
|||
if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
|
|||
o.BlockSize = BlobDefaultDownloadBlockSize |
|||
} |
|||
// StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
|
|||
} |
|||
} |
|||
|
|||
if readerSize <= BlockBlobMaxUploadBlobBytes { |
|||
// If the size can fit in 1 Upload call, do it this way
|
|||
var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize) |
|||
if o.Progress != nil { |
|||
body = pipeline.NewRequestBodyProgress(body, o.Progress) |
|||
} |
|||
return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions) |
|||
} |
|||
|
|||
var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1) |
|||
|
|||
blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
|
|||
progress := int64(0) |
|||
progressLock := &sync.Mutex{} |
|||
|
|||
err := DoBatchTransfer(ctx, BatchTransferOptions{ |
|||
OperationName: "uploadReaderAtToBlockBlob", |
|||
TransferSize: readerSize, |
|||
ChunkSize: o.BlockSize, |
|||
Parallelism: o.Parallelism, |
|||
Operation: func(offset int64, count int64, ctx context.Context) error { |
|||
// This function is called once per block.
|
|||
// It is passed this block's offset within the buffer and its count of bytes
|
|||
// Prepare to read the proper block/section of the buffer
|
|||
var body io.ReadSeeker = io.NewSectionReader(reader, offset, count) |
|||
blockNum := offset / o.BlockSize |
|||
if o.Progress != nil { |
|||
blockProgress := int64(0) |
|||
body = pipeline.NewRequestBodyProgress(body, |
|||
func(bytesTransferred int64) { |
|||
diff := bytesTransferred - blockProgress |
|||
blockProgress = bytesTransferred |
|||
progressLock.Lock() // 1 goroutine at a time gets a progress report
|
|||
progress += diff |
|||
o.Progress(progress) |
|||
progressLock.Unlock() |
|||
}) |
|||
} |
|||
|
|||
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
|
|||
// at the same time causing PutBlockList to get a mix of blocks from all the clients.
|
|||
blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes()) |
|||
_, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions) |
|||
return err |
|||
}, |
|||
}) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
// All put blocks were successful, call Put Block List to finalize the blob
|
|||
return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions) |
|||
} |
|||
|
|||
// The UUID reserved variants.
|
|||
const ( |
|||
reservedNCS byte = 0x80 |
|||
reservedRFC4122 byte = 0x40 |
|||
reservedMicrosoft byte = 0x20 |
|||
reservedFuture byte = 0x00 |
|||
) |
|||
|
|||
type uuid [16]byte |
|||
|
|||
// NewUUID returns a new uuid using RFC 4122 algorithm.
|
|||
func newUUID() (u uuid) { |
|||
u = uuid{} |
|||
// Set all bits to randomly (or pseudo-randomly) chosen values.
|
|||
rand.Read(u[:]) |
|||
u[8] = (u[8] | reservedRFC4122) & 0x7F // u.setVariant(ReservedRFC4122)
|
|||
|
|||
var version byte = 4 |
|||
u[6] = (u[6] & 0xF) | (version << 4) // u.setVersion(4)
|
|||
return |
|||
} |
|||
|
|||
// String returns an unparsed version of the generated UUID sequence.
|
|||
func (u uuid) String() string { |
|||
return fmt.Sprintf("%x-%x-%x-%x-%x", u[0:4], u[4:6], u[6:8], u[8:10], u[10:]) |
|||
} |
|||
|
|||
func (u uuid) bytes() []byte { |
|||
return u[:] |
|||
} |
@ -0,0 +1,207 @@ |
|||
package azure |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/Azure/azure-storage-blob-go/azblob" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"io" |
|||
"io/ioutil" |
|||
"net/url" |
|||
"os" |
|||
"reflect" |
|||
) |
|||
|
|||
func init() { |
|||
remote_storage.RemoteStorageClientMakers["azure"] = new(azureRemoteStorageMaker) |
|||
} |
|||
|
|||
type azureRemoteStorageMaker struct{} |
|||
|
|||
func (s azureRemoteStorageMaker) Make(conf *filer_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { |
|||
|
|||
client := &azureRemoteStorageClient{ |
|||
conf: conf, |
|||
} |
|||
|
|||
accountName, accountKey := conf.AzureAccountName, conf.AzureAccountKey |
|||
if len(accountName) == 0 || len(accountKey) == 0 { |
|||
accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") |
|||
if len(accountName) == 0 || len(accountKey) == 0 { |
|||
return nil, fmt.Errorf("either AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set") |
|||
} |
|||
} |
|||
|
|||
// Use your Storage account's name and key to create a credential object.
|
|||
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("invalid Azure credential with account name:%s: %v", accountName, err) |
|||
} |
|||
|
|||
// Create a request pipeline that is used to process HTTP(S) requests and responses.
|
|||
p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) |
|||
|
|||
// Create an ServiceURL object that wraps the service URL and a request pipeline.
|
|||
u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName)) |
|||
client.serviceURL = azblob.NewServiceURL(*u, p) |
|||
|
|||
return client, nil |
|||
} |
|||
|
|||
type azureRemoteStorageClient struct { |
|||
conf *filer_pb.RemoteConf |
|||
serviceURL azblob.ServiceURL |
|||
} |
|||
|
|||
var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{}) |
|||
|
|||
func (az *azureRemoteStorageClient) Traverse(loc *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { |
|||
|
|||
pathKey := loc.Path[1:] |
|||
containerURL := az.serviceURL.NewContainerURL(loc.Bucket) |
|||
|
|||
// List the container that we have created above
|
|||
for marker := (azblob.Marker{}); marker.NotDone(); { |
|||
// Get a result segment starting with the blob indicated by the current Marker.
|
|||
listBlob, err := containerURL.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{ |
|||
Prefix: pathKey, |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("azure traverse %s%s: %v", loc.Bucket, loc.Path, err) |
|||
} |
|||
|
|||
// ListBlobs returns the start of the next segment; you MUST use this to get
|
|||
// the next segment (after processing the current result segment).
|
|||
marker = listBlob.NextMarker |
|||
|
|||
// Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
|
|||
for _, blobInfo := range listBlob.Segment.BlobItems { |
|||
key := blobInfo.Name |
|||
key = "/" + key |
|||
dir, name := util.FullPath(key).DirAndName() |
|||
err = visitFn(dir, name, false, &filer_pb.RemoteEntry{ |
|||
RemoteMtime: blobInfo.Properties.LastModified.Unix(), |
|||
RemoteSize: *blobInfo.Properties.ContentLength, |
|||
RemoteETag: string(blobInfo.Properties.Etag), |
|||
StorageName: az.conf.Name, |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("azure processing %s%s: %v", loc.Bucket, loc.Path, err) |
|||
} |
|||
} |
|||
} |
|||
|
|||
return |
|||
} |
|||
func (az *azureRemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { |
|||
|
|||
key := loc.Path[1:] |
|||
containerURL := az.serviceURL.NewContainerURL(loc.Bucket) |
|||
blobURL := containerURL.NewBlockBlobURL(key) |
|||
|
|||
downloadResponse, readErr := blobURL.Download(context.Background(), offset, size, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) |
|||
if readErr != nil { |
|||
return nil, readErr |
|||
} |
|||
// NOTE: automatically retries are performed if the connection fails
|
|||
bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20}) |
|||
defer bodyStream.Close() |
|||
|
|||
data, err = ioutil.ReadAll(bodyStream) |
|||
|
|||
if err != nil { |
|||
return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err) |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func (az *azureRemoteStorageClient) WriteDirectory(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { |
|||
return nil |
|||
} |
|||
|
|||
func (az *azureRemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { |
|||
|
|||
key := loc.Path[1:] |
|||
containerURL := az.serviceURL.NewContainerURL(loc.Bucket) |
|||
blobURL := containerURL.NewBlockBlobURL(key) |
|||
|
|||
readerAt, ok := reader.(io.ReaderAt) |
|||
if !ok { |
|||
return nil, fmt.Errorf("unexpected reader: readerAt expected") |
|||
} |
|||
fileSize := int64(filer.FileSize(entry)) |
|||
|
|||
_, err = uploadReaderAtToBlockBlob(context.Background(), readerAt, fileSize, blobURL, azblob.UploadToBlockBlobOptions{ |
|||
BlockSize: 4 * 1024 * 1024, |
|||
Parallelism: 16}) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("azure upload to %s%s: %v", loc.Bucket, loc.Path, err) |
|||
} |
|||
|
|||
metadata := toMetadata(entry.Extended) |
|||
if len(metadata) > 0 { |
|||
_, err = blobURL.SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("azure set metadata on %s%s: %v", loc.Bucket, loc.Path, err) |
|||
} |
|||
} |
|||
|
|||
// read back the remote entry
|
|||
return az.readFileRemoteEntry(loc) |
|||
|
|||
} |
|||
|
|||
func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *filer_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { |
|||
key := loc.Path[1:] |
|||
containerURL := az.serviceURL.NewContainerURL(loc.Bucket) |
|||
blobURL := containerURL.NewBlockBlobURL(key) |
|||
|
|||
attr, err := blobURL.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) |
|||
|
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
return &filer_pb.RemoteEntry{ |
|||
RemoteMtime: attr.LastModified().Unix(), |
|||
RemoteSize: attr.ContentLength(), |
|||
RemoteETag: string(attr.ETag()), |
|||
StorageName: az.conf.Name, |
|||
}, nil |
|||
|
|||
} |
|||
|
|||
func toMetadata(attributes map[string][]byte) map[string]string { |
|||
metadata := make(map[string]string) |
|||
for k, v := range attributes { |
|||
metadata[k] = string(v) |
|||
} |
|||
return metadata |
|||
} |
|||
|
|||
func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) { |
|||
if reflect.DeepEqual(oldEntry.Extended, newEntry.Extended) { |
|||
return nil |
|||
} |
|||
metadata := toMetadata(newEntry.Extended) |
|||
|
|||
key := loc.Path[1:] |
|||
containerURL := az.serviceURL.NewContainerURL(loc.Bucket) |
|||
|
|||
_, err = containerURL.NewBlobURL(key).SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) |
|||
|
|||
return |
|||
} |
|||
func (az *azureRemoteStorageClient) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) { |
|||
key := loc.Path[1:] |
|||
containerURL := az.serviceURL.NewContainerURL(loc.Bucket) |
|||
if _, err = containerURL.NewBlobURL(key).Delete(context.Background(), |
|||
azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil { |
|||
return fmt.Errorf("azure delete %s%s: %v", loc.Bucket, loc.Path, err) |
|||
} |
|||
return |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue