Browse Source

Merge branch 'master' into bptree

pull/2354/head
Chris Lu 3 years ago
parent
commit
b3d88180ca
  1. 111
      .github/workflows/binaries_dev.yml
  2. 4
      k8s/helm_charts2/Chart.yaml
  3. 4
      weed/command/filer_remote_gateway.go
  4. 11
      weed/command/filer_remote_sync.go
  5. 5
      weed/s3api/s3api_bucket_handlers.go
  6. 4
      weed/s3api/s3api_object_copy_handlers.go
  7. 7
      weed/s3api/s3api_object_handlers.go
  8. 3
      weed/s3api/s3api_object_handlers_postpolicy.go
  9. 37
      weed/s3api/s3api_object_skip_handlers.go
  10. 3
      weed/s3api/s3api_object_tagging_handlers.go
  11. 2
      weed/s3api/s3api_objects_list_handlers.go
  12. 9
      weed/s3api/s3api_server.go
  13. 1
      weed/server/filer_server_handlers_read.go
  14. 2
      weed/shell/commands.go
  15. 2
      weed/util/constants.go
  16. 12
      weed/wdclient/exclusive_locks/exclusive_locker.go

111
.github/workflows/binaries_dev.yml

@ -6,57 +6,72 @@ on:
jobs: jobs:
cleanup:
runs-on: ubuntu-latest
build-latest-docker-image:
runs-on: [ubuntu-latest]
steps:
- name: Delete old release assets
uses: mknejp/delete-release-assets@v1
with:
token: ${{ github.token }}
tag: dev
fail-if-no-assets: false
assets: |
weed-*
build_dev:
needs: cleanup
runs-on: ubuntu-latest
strategy:
matrix:
goos: [linux, windows, darwin, freebsd]
goarch: [amd64, arm, arm64]
exclude:
- goarch: arm
goos: darwin
- goarch: 386
goos: darwin
- goarch: arm
goos: windows
- goarch: arm64
goos: windows
steps: steps:
-
name: Checkout
- name: Check out code into the Go module directory
uses: actions/checkout@v2 uses: actions/checkout@v2
-
name: Docker meta
id: docker_meta
uses: docker/metadata-action@v3
with:
images: |
chrislusf/seaweedfs
ghcr.io/chrislusf/seaweedfs
tags: |
type=raw,value=latest
labels: |
org.opencontainers.image.title=seaweedfs
org.opencontainers.image.vendor=Chris Lu
-
name: Set up QEMU
uses: docker/setup-qemu-action@v1
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
with:
buildkitd-flags: "--debug"
-
name: Login to Docker Hub
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
-
name: Login to GHCR
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
- name: Set BUILD_TIME env
run: echo BUILD_TIME=$(date -u +%Y%m%d-%H%M) >> ${GITHUB_ENV}
- name: Go Release Binaries Large Disk
uses: wangyoucao577/go-release-action@v1.20
with: with:
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
-
name: Build
uses: docker/build-push-action@v2
github_token: ${{ secrets.GITHUB_TOKEN }}
goos: ${{ matrix.goos }}
goarch: ${{ matrix.goarch }}
release_tag: dev
overwrite: true
pre_command: export CGO_ENABLED=0
build_flags: -tags 5BytesOffset # optional, default is
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .`
project_path: weed
binary_name: weed-large-disk
asset_name: "weed-large-disk-${{ env.BUILD_TIME }}-${{ matrix.goos }}-${{ matrix.goarch }}"
- name: Go Release Binaries Normal Volume Size
uses: wangyoucao577/go-release-action@v1.20
with: with:
context: ./docker
push: ${{ github.event_name != 'pull_request' }}
file: ./docker/Dockerfile
platforms: linux/amd64, linux/arm, linux/arm64
tags: ${{ steps.docker_meta.outputs.tags }}
labels: ${{ steps.docker_meta.outputs.labels }}
github_token: ${{ secrets.GITHUB_TOKEN }}
goos: ${{ matrix.goos }}
goarch: ${{ matrix.goarch }}
release_tag: dev
overwrite: true
pre_command: export CGO_ENABLED=0
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}}
# Where to run `go build .`
project_path: weed
binary_name: weed-normal-disk
asset_name: "weed-${{ env.BUILD_TIME }}-${{ matrix.goos }}-${{ matrix.goarch }}"

4
k8s/helm_charts2/Chart.yaml

@ -1,5 +1,5 @@
apiVersion: v1 apiVersion: v1
description: SeaweedFS description: SeaweedFS
name: seaweedfs name: seaweedfs
appVersion: "2.68"
version: "2.68"
appVersion: "2.69"
version: "2.69"

4
weed/command/filer_remote_gateway.go

@ -22,8 +22,8 @@ type RemoteGatewayOptions struct {
timeAgo *time.Duration timeAgo *time.Duration
createBucketAt *string createBucketAt *string
createBucketRandomSuffix *bool createBucketRandomSuffix *bool
include *string
exclude *string
include *string
exclude *string
mappings *remote_pb.RemoteStorageMapping mappings *remote_pb.RemoteStorageMapping
remoteConfs map[string]*remote_pb.RemoteConf remoteConfs map[string]*remote_pb.RemoteConf

11
weed/command/filer_remote_sync.go

@ -13,12 +13,11 @@ import (
) )
type RemoteSyncOptions struct { type RemoteSyncOptions struct {
filerAddress *string
grpcDialOption grpc.DialOption
readChunkFromFiler *bool
timeAgo *time.Duration
dir *string
filerAddress *string
grpcDialOption grpc.DialOption
readChunkFromFiler *bool
timeAgo *time.Duration
dir *string
} }
var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) var _ = filer_pb.FilerClient(&RemoteSyncOptions{})

5
weed/s3api/s3api_bucket_handlers.go

@ -27,6 +27,8 @@ type ListAllMyBucketsResult struct {
func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
glog.V(3).Infof("ListBucketsHandler")
var identity *Identity var identity *Identity
var s3Err s3err.ErrorCode var s3Err s3err.ErrorCode
if s3a.iam.isEnabled() { if s3a.iam.isEnabled() {
@ -75,6 +77,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
glog.V(3).Infof("PutBucketHandler %s", bucket)
// avoid duplicated buckets // avoid duplicated buckets
errCode := s3err.ErrNone errCode := s3err.ErrNone
@ -128,6 +131,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
glog.V(3).Infof("DeleteBucketHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, err, r) s3err.WriteErrorResponse(w, err, r)
@ -162,6 +166,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
glog.V(3).Infof("HeadBucketHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, err, r) s3err.WriteErrorResponse(w, err, r)

4
weed/s3api/s3api_object_copy_handlers.go

@ -27,6 +27,8 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
srcBucket, srcObject := pathToBucketAndObject(cpSrcPath) srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject)
if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && isReplace(r) { if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && isReplace(r) {
fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
dir, name := fullPath.DirAndName() dir, name := fullPath.DirAndName()
@ -139,6 +141,8 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
return return
} }
glog.V(3).Infof("CopyObjectPartHandler %s %s => %s part %d", srcBucket, srcObject, dstBucket, partID)
// check partID with maximum part ID for multipart objects // check partID with maximum part ID for multipart objects
if partID > globalMaxPartID { if partID > globalMaxPartID {
s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxParts, r) s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxParts, r)

7
weed/s3api/s3api_object_handlers.go

@ -41,6 +41,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
bucket, object := getBucketAndObject(r) bucket, object := getBucketAndObject(r)
glog.V(3).Infof("PutObjectHandler %s %s", bucket, object)
_, err := validateContentMd5(r.Header) _, err := validateContentMd5(r.Header)
if err != nil { if err != nil {
@ -118,6 +119,7 @@ func urlPathEscape(object string) string {
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := getBucketAndObject(r) bucket, object := getBucketAndObject(r)
glog.V(3).Infof("GetObjectHandler %s %s", bucket, object)
if strings.HasSuffix(r.URL.Path, "/") { if strings.HasSuffix(r.URL.Path, "/") {
s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r) s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r)
@ -134,6 +136,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := getBucketAndObject(r) bucket, object := getBucketAndObject(r)
glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object)
destUrl := fmt.Sprintf("http://%s%s/%s%s", destUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlPathEscape(object)) s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlPathEscape(object))
@ -145,6 +148,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := getBucketAndObject(r) bucket, object := getBucketAndObject(r)
glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object)
destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true", destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlPathEscape(object)) s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlPathEscape(object))
@ -192,6 +196,7 @@ type DeleteObjectsResponse struct {
func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
glog.V(3).Infof("DeleteMultipleObjectsHandler %s", bucket)
deleteXMLBytes, err := ioutil.ReadAll(r.Body) deleteXMLBytes, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
@ -291,7 +296,7 @@ var passThroughHeaders = []string{
func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter)) { func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter)) {
glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl)
glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl)
proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body) proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)

3
weed/s3api/s3api_object_handlers_postpolicy.go

@ -5,6 +5,7 @@ import (
"encoding/base64" "encoding/base64"
"errors" "errors"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api/policy" "github.com/chrislusf/seaweedfs/weed/s3api/policy"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err" "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
@ -24,6 +25,8 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
bucket := mux.Vars(r)["bucket"] bucket := mux.Vars(r)["bucket"]
glog.V(3).Infof("PostPolicyBucketHandler %s", bucket)
reader, err := r.MultipartReader() reader, err := r.MultipartReader()
if err != nil { if err != nil {
s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r) s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r)

37
weed/s3api/s3api_object_skip_handlers.go

@ -0,0 +1,37 @@
package s3api
import (
"net/http"
)
// PutObjectAclHandler Put object ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectAcl.html
func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// PutObjectRetentionHandler Put object Retention
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectRetention.html
func (s3a *S3ApiServer) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// PutObjectLegalHoldHandler Put object Legal Hold
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectLegalHold.html
func (s3a *S3ApiServer) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// PutObjectLockConfigurationHandler Put object Lock configuration
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectLockConfiguration.html
func (s3a *S3ApiServer) PutObjectLockConfigurationHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

3
weed/s3api/s3api_object_tagging_handlers.go

@ -17,6 +17,7 @@ import (
func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := getBucketAndObject(r) bucket, object := getBucketAndObject(r)
glog.V(3).Infof("GetObjectTaggingHandler %s %s", bucket, object)
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)) target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
dir, name := target.DirAndName() dir, name := target.DirAndName()
@ -42,6 +43,7 @@ func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.R
func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := getBucketAndObject(r) bucket, object := getBucketAndObject(r)
glog.V(3).Infof("PutObjectTaggingHandler %s %s", bucket, object)
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)) target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
dir, name := target.DirAndName() dir, name := target.DirAndName()
@ -97,6 +99,7 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R
func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := getBucketAndObject(r) bucket, object := getBucketAndObject(r)
glog.V(3).Infof("DeleteObjectTaggingHandler %s %s", bucket, object)
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)) target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
dir, name := target.DirAndName() dir, name := target.DirAndName()

2
weed/s3api/s3api_objects_list_handlers.go

@ -40,6 +40,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
// collect parameters // collect parameters
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
glog.V(3).Infof("ListObjectsV2Handler %s", bucket)
originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query()) originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
@ -95,6 +96,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
// collect parameters // collect parameters
bucket, _ := getBucketAndObject(r) bucket, _ := getBucketAndObject(r)
glog.V(3).Infof("ListObjectsV1Handler %s", bucket)
originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query()) originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query())

9
weed/s3api/s3api_server.go

@ -90,6 +90,15 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// DeleteObjectTagging // DeleteObjectTagging
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING), "DELETE")).Queries("tagging", "") bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING), "DELETE")).Queries("tagging", "")
// PutObjectACL
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectAclHandler, ACTION_WRITE), "PUT")).Queries("acl", "")
// PutObjectRetention
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectRetentionHandler, ACTION_WRITE), "PUT")).Queries("retention", "")
// PutObjectLegalHold
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectLegalHoldHandler, ACTION_WRITE), "PUT")).Queries("legal-hold", "")
// PutObjectLockConfiguration
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE), "PUT")).Queries("object-lock", "")
// CopyObject // CopyObject
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY")) bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY"))
// PutObject // PutObject

1
weed/server/filer_server_handlers_read.go

@ -177,6 +177,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
Directory: dir, Directory: dir,
Name: name, Name: name,
}); err != nil { }); err != nil {
glog.Errorf("DownloadToLocal %s: %v", entry.FullPath, err)
return fmt.Errorf("cache %s: %v", entry.FullPath, err) return fmt.Errorf("cache %s: %v", entry.FullPath, err)
} else { } else {
chunks = resp.Entry.Chunks chunks = resp.Entry.Chunks

2
weed/shell/commands.go

@ -49,7 +49,7 @@ func NewCommandEnv(options ShellOptions) *CommandEnv {
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()), MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()),
option: options, option: options,
} }
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "admin")
return ce return ce
} }

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
) )
var ( var (
VERSION_NUMBER = fmt.Sprintf("%.02f", 2.68)
VERSION_NUMBER = fmt.Sprintf("%.02f", 2.69)
VERSION = sizeLimit + " " + VERSION_NUMBER VERSION = sizeLimit + " " + VERSION_NUMBER
COMMIT = "" COMMIT = ""
) )

12
weed/wdclient/exclusive_locks/exclusive_locker.go

@ -14,7 +14,6 @@ const (
RenewInteval = 4 * time.Second RenewInteval = 4 * time.Second
SafeRenewInteval = 3 * time.Second SafeRenewInteval = 3 * time.Second
InitLockInteval = 1 * time.Second InitLockInteval = 1 * time.Second
AdminLockName = "admin"
) )
type ExclusiveLocker struct { type ExclusiveLocker struct {
@ -22,13 +21,16 @@ type ExclusiveLocker struct {
lockTsNs int64 lockTsNs int64
isLocking bool isLocking bool
masterClient *wdclient.MasterClient masterClient *wdclient.MasterClient
lockName string
} }
func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *ExclusiveLocker {
return &ExclusiveLocker{ return &ExclusiveLocker{
masterClient: masterClient, masterClient: masterClient,
lockName: lockName,
} }
} }
func (l *ExclusiveLocker) IsLocking() bool { func (l *ExclusiveLocker) IsLocking() bool {
return l.isLocking return l.isLocking
} }
@ -55,7 +57,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{ resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token), PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
LockName: l.lockName,
ClientName: clientName, ClientName: clientName,
}) })
if err == nil { if err == nil {
@ -83,7 +85,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token), PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
LockName: l.lockName,
ClientName: clientName, ClientName: clientName,
}) })
if err == nil { if err == nil {
@ -114,7 +116,7 @@ func (l *ExclusiveLocker) ReleaseLock() {
client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{ client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token), PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
LockName: l.lockName,
}) })
return nil return nil
}) })

Loading…
Cancel
Save