diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 71143f307..e74ea7d93 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -466,7 +466,7 @@ func detectMimeType(f *os.File) string { func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx context.Context, clientConn *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(clientConn) return fn(client) }, filerAddress, grpcDialOption) diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go index 1c5af7fe2..af804b909 100644 --- a/weed/filer2/filer_client_util.go +++ b/weed/filer2/filer_client_util.go @@ -22,7 +22,7 @@ func VolumeId(fileId string) string { } type FilerClient interface { - WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error + WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error } func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { @@ -33,7 +33,7 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath F vid2Locations := make(map[string]*filer_pb.Locations) - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { glog.V(4).Infof("read fh lookup volume id locations: %v", vids) resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ @@ -97,7 +97,7 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath FullPat dir, name := fullFilePath.DirAndName() - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, @@ -128,7 +128,7 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath FullPat func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) { - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { lastEntryName := "" diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index fe6b30619..abe5a21a6 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -128,7 +128,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, } glog.V(1).Infof("create: %v", req.String()) - if err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { if err := filer_pb.CreateEntry(ctx, client, request); err != nil { if strings.Contains(err.Error(), "EEXIST") { return fuse.EEXIST @@ -139,11 +139,13 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, }); err != nil { return nil, nil, err } - node := dir.newFile(req.Name, request.Entry) + var node fs.Node if request.Entry.IsDirectory { + node = dir.newDirectory(filer2.NewFullPath(dir.Path, req.Name), request.Entry) return node, nil, nil } + node = dir.newFile(req.Name, request.Entry) file := node.(*File) file.isOpen++ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) @@ -165,7 +167,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err }, } - err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: dir.Path, @@ -279,7 +281,7 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro dir.wfs.cacheDelete(filePath) - return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: dir.Path, @@ -303,7 +305,7 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.Name)) - return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: dir.Path, @@ -427,7 +429,7 @@ func (dir *Dir) saveEntry(ctx context.Context) error { parentDir, name := filer2.FullPath(dir.Path).DirAndName() - return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: parentDir, diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 13be62670..8b7ec7e89 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -35,7 +35,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, }, } - err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { if err := filer_pb.CreateEntry(ctx, client, request); err != nil { glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err) return fuse.EIO diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 8db879d2c..4eb3c15b5 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -15,7 +15,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector newDir := newDirectory.(*Dir) glog.V(4).Infof("dir Rename %s/%s => %s/%s", dir.Path, req.OldName, newDir.Path, req.NewName) - err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: dir.Path, diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 7a41e371e..5ff128323 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -140,7 +140,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io. var fileId, host string var auth security.EncodedJwt - if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if err := pages.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/filesys/file.go b/weed/filesys/file.go index cc0717f18..eccef4e58 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -257,7 +257,7 @@ func (file *File) setEntry(entry *filer_pb.Entry) { } func (file *File) saveEntry(ctx context.Context) error { - return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return file.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: file.dir.Path, diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index fad5418e2..ef0243090 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -169,7 +169,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return nil } - err = fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = fh.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index bc78a0dbe..af1102b45 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -88,22 +88,22 @@ func (wfs *WFS) Root() (fs.Node, error) { return wfs.root, nil } -func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { +func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { - err := util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + err := util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) if err == nil { return nil } if strings.Contains(err.Error(), "context canceled") { - time.Sleep(1337 * time.Millisecond) + time.Sleep(3337 * time.Millisecond) glog.V(2).Infoln("retry context canceled request...") - return util.WithCachedGrpcClient(context.Background(), func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(context.Background(), func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) } return err @@ -163,7 +163,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index 52c275e26..cce0c792c 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -20,7 +20,7 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu fileIds = append(fileIds, chunk.GetFileIdString()) } - wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds) return nil }) diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 75ba0f2ba..9dfb491fd 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -117,7 +117,7 @@ func (wfs *WFS) maybeLoadEntry(ctx context.Context, dir, name string) (entry *fi } // glog.V(3).Infof("read entry cache miss %s", fullpath) - err = wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 2dfa44483..b67d8b708 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -44,7 +44,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum continue } - lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: primaryRequest.Count, diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index e4aa6c6d3..95bbde9f9 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -117,7 +117,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ FileIds: fileIds, diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index f6b2b69e9..e7ee2d2ba 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -12,7 +12,7 @@ import ( "strings" ) -func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { +func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error { ctx := context.Background() @@ -21,9 +21,9 @@ func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, return err } - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, grpcAddress, grpcDialOption) } @@ -38,7 +38,7 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil } -func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { +func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error { ctx := context.Background() @@ -47,9 +47,9 @@ func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr) } - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, masterGrpcAddress, grpcDialOption) } diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index d0773e7fd..78769ac5a 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -99,12 +99,12 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin //only query unknown_vids - err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeIds: unknown_vids, } - resp, grpcErr := masterClient.LookupVolume(context.Background(), req) + resp, grpcErr := masterClient.LookupVolume(ctx, req) if grpcErr != nil { return grpcErr } diff --git a/weed/operation/stats.go b/weed/operation/stats.go index b69a33750..3e6327f19 100644 --- a/weed/operation/stats.go +++ b/weed/operation/stats.go @@ -9,9 +9,9 @@ import ( func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) { - err = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { - grpcResponse, grpcErr := masterClient.Statistics(context.Background(), req) + grpcResponse, grpcErr := masterClient.Statistics(ctx, req) if grpcErr != nil { return grpcErr } diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index 5562f12ab..4b39ad544 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -8,9 +8,9 @@ import ( func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { - WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + WithVolumeServerClient(server, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { - resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{ + resp, err = client.VolumeSyncStatus(ctx, &volume_server_pb.VolumeSyncStatusRequest{ VolumeId: vid, }) return nil diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index b53f18ce1..1e8b0a16e 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -26,9 +26,9 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume } func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { - return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { - stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{ + stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{ VolumeId: uint32(vid), SinceNs: sinceNs, IdleTimeoutSeconds: uint32(idleTimeoutSeconds), diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 97e9671a3..26c055da5 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -63,7 +63,7 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi var host string var auth security.EncodedJwt - if err := fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if err := fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -104,11 +104,11 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi return } -func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx context.Context, grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) + return fn(ctx, client) }, fs.grpcAddress, fs.grpcDialOption) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 58a3ab9c2..4790d1562 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -64,7 +64,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string, } func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { - return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() @@ -87,7 +87,7 @@ func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, d func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { - return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() @@ -139,7 +139,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file // read existing entry var existingEntry *filer_pb.Entry - err = fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, @@ -191,7 +191,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file } // save updated meta data - return true, fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return true, fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: newParentPath, diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index d7b5ebc4d..aef13be75 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -45,7 +45,7 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl s vid := volumeId(part) - err = fs.withFilerClient(ctx, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = fs.withFilerClient(ctx, fs.grpcDialOption, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { glog.V(4).Infof("read lookup volume id locations: %v", vid) resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ @@ -89,11 +89,11 @@ func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename stri return filename, header, readCloser, err } -func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, fs.grpcAddress, fs.grpcDialOption) } diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 127be07e3..602f03e5c 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -39,7 +39,7 @@ func encodeResponse(response interface{}) []byte { func (s3a *S3ApiServer) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx context.Context, grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c703b8c6f..3a2eca6d4 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -124,8 +124,8 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { } func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) { - err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err) } diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go index f8e0785f6..f02b0f242 100644 --- a/weed/server/master_grpc_server_collection.go +++ b/weed/server/master_grpc_server_collection.go @@ -57,8 +57,8 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) return deleteErr @@ -77,8 +77,8 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error { listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName) for _, server := range listOfEcServers { - err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ + err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) return deleteErr diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 2965a4863..44a04cb86 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -25,8 +25,8 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) return deleteErr diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 0153d5efc..6d74f8171 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -41,7 +41,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // confirm size and timestamp var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse var volumeFileName, idxFileName, datFileName string - err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { var err error volFileInfoResp, err = client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 67efc0f6d..256e7c447 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -106,7 +106,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) - err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { // copy ec data slices for _, shardId := range req.ShardIds { diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 875cc2999..bdb6b61a9 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -98,11 +98,11 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { }, nil } -func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) } @@ -137,7 +137,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm return os.ErrExist } - return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(fullDirPath).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -186,7 +186,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f } dir, name := filer2.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ @@ -251,7 +251,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath) } dir, name := filer2.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: dir, @@ -310,7 +310,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName() newDir, newBaseName := filer2.FullPath(newName).DirAndName() - return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: oldDir, @@ -385,7 +385,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { var fileId, host string var auth security.EncodedJwt - if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -429,7 +429,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Chunks = append(f.entry.Chunks, chunk) dir, _ := filer2.FullPath(f.name).DirAndName() - err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { f.entry.Attributes.Mtime = time.Now().Unix() request := &filer_pb.UpdateEntryRequest{ diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 2beed4742..e187d5a3b 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -56,7 +56,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) - err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { if targetServer.info.Id != existingLocation { @@ -216,7 +216,7 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -232,7 +232,7 @@ func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{ VolumeId: uint32(volumeId), ShardIds: toBeUnmountedhardIds, @@ -246,7 +246,7 @@ func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 1f9ad2ff9..8a705a5ae 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -99,7 +99,7 @@ func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) @@ -132,7 +132,7 @@ func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, v fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, @@ -170,7 +170,7 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 58527abf2..587b59388 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -120,7 +120,7 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol for _, location := range locations { - err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), }) @@ -138,7 +138,7 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 2e2fca743..600a8cb45 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -170,7 +170,7 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder * func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { - err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -209,7 +209,7 @@ func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder var copyErr error if applyBalancing { - copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 9db36e9d1..238dee7f9 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -46,7 +46,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write dir, name := filer2.FullPath(path).DirAndName() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 2c46350b2..d6ea51d0c 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -82,12 +82,12 @@ func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient file return } -func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error { +func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000) - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, filerGrpcAddress, env.option.GrpcDialOption) } @@ -105,6 +105,6 @@ func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *comm filerPort: filerPort, } } -func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { +func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { return c.env.withFilerClient(ctx, c.filerServer, c.filerPort, fn) } diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index 5908b0a3c..9980f67a2 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -45,7 +45,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W dir, name := filer2.FullPath(path).DirAndName() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 52dd0321b..8f2ef95e3 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -55,7 +55,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. ctx := context.Background() - err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { sizeBuf := make([]byte, 4) diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 67606ab53..e77755921 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -53,7 +53,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer destinationDir, destinationName := filer2.FullPath(destinationPath).DirAndName() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { // collect destination entry info destinationRequest := &filer_pb.LookupDirectoryEntryRequest{ diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 6f35dd5d2..7a1a77cbe 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -113,7 +113,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, break } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ VolumeId: volumeInfo.Id, SourceDataNode: sourceNode.dataNode.Id, diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index 50a307492..21bc342b4 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -51,7 +51,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io } func mountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index e74b43ed4..2e39c0600 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -88,7 +88,7 @@ func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeI func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) { - err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { resp, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), SourceDataNode: sourceVolumeServer, @@ -104,7 +104,7 @@ func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { - return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), SinceNs: lastAppendAtNs, @@ -117,7 +117,7 @@ func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne } func deleteVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 4584289d7..0f1a1bb6e 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -118,7 +118,7 @@ func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error { - err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(ctx, &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index 0a9e6165f..20da1187c 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -114,7 +114,7 @@ func doVolumeTierUpload(ctx context.Context, commandEnv *CommandEnv, writer io.W func uploadDatToRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(ctx, &volume_server_pb.VolumeTierMoveDatToRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index 8096f34d8..826258dfb 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -51,7 +51,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer } func unmountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index a6a0f7dec..f1fcb62d4 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -70,7 +70,7 @@ func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, fi dir, name := filer2.FullPath(path).DirAndName() - return ce.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return ce.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { resp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 27406451f..47e061d05 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -230,7 +230,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId) - err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupEcVolumeRequest{ VolumeId: uint32(ecVolume.VolumeId), } @@ -278,7 +278,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { - err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { // copy data slice shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{ diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index e027d2887..2ac907f6c 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -87,7 +87,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { - return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { // copy data slice _, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{ diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index ec29c895e..3763d5515 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -64,8 +64,6 @@ update needle map when receiving new .dat bytes. But seems not necessary now.) func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error { - ctx := context.Background() - startFromOffset, _, _ := v.FileStat() appendAtNs, err := v.findLastAppendAtNs() if err != nil { @@ -74,7 +72,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial writeOffset := int64(startFromOffset) - err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{ VolumeId: uint32(v.Id), diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index e5dc48652..6ca987bc5 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -15,7 +15,7 @@ type AllocateVolumeResult struct { func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error { - return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{ VolumeId: uint32(vid), diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index ca626e973..e7dbf9b1e 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -19,8 +19,8 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi errCount := int32(0) for index, dn := range locationlist.list { go func(index int, url string, vid needle.VolumeId) { - err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + err := operation.WithVolumeServerClient(url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, err := volumeServerClient.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: uint32(vid), }) if err != nil { @@ -63,8 +63,8 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, for index, dn := range locationlist.list { go func(index int, url string, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) - err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ + err := operation.WithVolumeServerClient(url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { + _, err := volumeServerClient.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: uint32(vid), }) return err @@ -93,8 +93,8 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v isCommitSuccess := true for _, dn := range locationlist.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ + err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { + _, err := volumeServerClient.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: uint32(vid), }) return err @@ -114,8 +114,8 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ + err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { + _, err := volumeServerClient.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: uint32(vid), }) return err diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go index 63519d97a..7e396342b 100644 --- a/weed/util/grpc_client_server.go +++ b/weed/util/grpc_client_server.go @@ -57,14 +57,14 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr return grpc.DialContext(ctx, address, options...) } -func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { +func WithCachedGrpcClient(ctx context.Context, fn func(context.Context, *grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { grpcClientsLock.Lock() existingConnection, found := grpcClients[address] if found { grpcClientsLock.Unlock() - err := fn(existingConnection) + err := fn(ctx, existingConnection) if err != nil { grpcClientsLock.Lock() delete(grpcClients, address) @@ -83,7 +83,7 @@ func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error, grpcClients[address] = grpcConnection grpcClientsLock.Unlock() - err = fn(grpcConnection) + err = fn(ctx, grpcConnection) if err != nil { grpcClientsLock.Lock() delete(grpcClients, address) diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 111514f5e..30b0cf160 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -125,9 +125,9 @@ func withMasterClient(ctx context.Context, master string, grpcDialOption grpc.Di return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr) } - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) - return fn(ctx, client) + return fn(ctx2, client) }, masterGrpcAddress, grpcDialOption) }