120 lines
4.4 KiB

3 years ago
3 years ago
  1. package azure
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "encoding/base64"
  6. "errors"
  7. "fmt"
  8. "github.com/Azure/azure-pipeline-go/pipeline"
  9. . "github.com/Azure/azure-storage-blob-go/azblob"
  10. "io"
  11. "sync"
  12. )
  13. // copied from https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/highlevel.go#L73:6
  14. // uploadReaderAtToBlockBlob was not public
  15. // uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob.
  16. func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64,
  17. blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
  18. if o.BlockSize == 0 {
  19. // If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
  20. if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
  21. return nil, errors.New("buffer is too large to upload to a block blob")
  22. }
  23. // If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
  24. if readerSize <= BlockBlobMaxUploadBlobBytes {
  25. o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
  26. } else {
  27. o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
  28. if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
  29. o.BlockSize = BlobDefaultDownloadBlockSize
  30. }
  31. // StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
  32. }
  33. }
  34. if readerSize <= BlockBlobMaxUploadBlobBytes {
  35. // If the size can fit in 1 Upload call, do it this way
  36. var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize)
  37. if o.Progress != nil {
  38. body = pipeline.NewRequestBodyProgress(body, o.Progress)
  39. }
  40. return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions, o.ImmutabilityPolicyOptions)
  41. }
  42. var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1)
  43. blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
  44. progress := int64(0)
  45. progressLock := &sync.Mutex{}
  46. err := DoBatchTransfer(ctx, BatchTransferOptions{
  47. OperationName: "uploadReaderAtToBlockBlob",
  48. TransferSize: readerSize,
  49. ChunkSize: o.BlockSize,
  50. Parallelism: o.Parallelism,
  51. Operation: func(offset int64, count int64, ctx context.Context) error {
  52. // This function is called once per block.
  53. // It is passed this block's offset within the buffer and its count of bytes
  54. // Prepare to read the proper block/section of the buffer
  55. var body io.ReadSeeker = io.NewSectionReader(reader, offset, count)
  56. blockNum := offset / o.BlockSize
  57. if o.Progress != nil {
  58. blockProgress := int64(0)
  59. body = pipeline.NewRequestBodyProgress(body,
  60. func(bytesTransferred int64) {
  61. diff := bytesTransferred - blockProgress
  62. blockProgress = bytesTransferred
  63. progressLock.Lock() // 1 goroutine at a time gets a progress report
  64. progress += diff
  65. o.Progress(progress)
  66. progressLock.Unlock()
  67. })
  68. }
  69. // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
  70. // at the same time causing PutBlockList to get a mix of blocks from all the clients.
  71. blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
  72. _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions)
  73. return err
  74. },
  75. })
  76. if err != nil {
  77. return nil, err
  78. }
  79. // All put blocks were successful, call Put Block List to finalize the blob
  80. return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions, o.ImmutabilityPolicyOptions)
  81. }
  82. // The UUID reserved variants.
  83. const (
  84. reservedNCS byte = 0x80
  85. reservedRFC4122 byte = 0x40
  86. reservedMicrosoft byte = 0x20
  87. reservedFuture byte = 0x00
  88. )
  89. type uuid [16]byte
  90. // NewUUID returns a new uuid using RFC 4122 algorithm.
  91. func newUUID() (u uuid) {
  92. u = uuid{}
  93. // Set all bits to randomly (or pseudo-randomly) chosen values.
  94. rand.Read(u[:])
  95. u[8] = (u[8] | reservedRFC4122) & 0x7F // u.setVariant(ReservedRFC4122)
  96. var version byte = 4
  97. u[6] = (u[6] & 0xF) | (version << 4) // u.setVersion(4)
  98. return
  99. }
  100. // String returns an unparsed version of the generated UUID sequence.
  101. func (u uuid) String() string {
  102. return fmt.Sprintf("%x-%x-%x-%x-%x", u[0:4], u[4:6], u[6:8], u[8:10], u[10:])
  103. }
  104. func (u uuid) bytes() []byte {
  105. return u[:]
  106. }