From b52b8ec68553781408d1ac8c6f7a2cd4d935aea6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 26 Apr 2020 05:21:54 -0700 Subject: [PATCH] Hadoop: fix entry not found for HCFS also fix cipher related changes. --- .../java/seaweedfs/client/FilerClient.java | 9 ++++--- .../java/seaweedfs/client/SeaweedRead.java | 10 +++++--- .../java/seaweedfs/client/SeaweedWrite.java | 4 +-- weed/server/filer_grpc_server.go | 25 ++++++++++++++++--- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 84aa26ad9..ef32c7e9a 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -14,7 +14,7 @@ public class FilerClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); - private FilerGrpcClient filerGrpcClient; + private final FilerGrpcClient filerGrpcClient; public FilerClient(String host, int grpcPort) { filerGrpcClient = new FilerGrpcClient(host, grpcPort); @@ -181,7 +181,7 @@ public class FilerClient { .setLimit(limit) .build()); List entries = new ArrayList<>(); - while (iter.hasNext()){ + while (iter.hasNext()) { FilerProto.ListEntriesResponse resp = iter.next(); entries.add(fixEntryAfterReading(resp.getEntry())); } @@ -195,9 +195,12 @@ public class FilerClient { .setDirectory(directory) .setName(entryName) .build()).getEntry(); + if (entry == null) { + return null; + } return fixEntryAfterReading(entry); } catch (Exception e) { - if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){ + if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) { return null; } LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index ad92ba006..1e4a158c6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -99,10 +99,12 @@ public class SeaweedRead { data = Gzip.decompress(data); } - try { - data = SeaweedCipher.decrypt(data, chunkView.cipherKey); - } catch (Exception e) { - throw new IOException("fail to decrypt", e); + if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { + try { + data = SeaweedCipher.decrypt(data, chunkView.cipherKey); + } catch (Exception e) { + throw new IOException("fail to decrypt", e); + } } return data; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 178234d5a..dc6203e52 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -36,7 +36,7 @@ public class SeaweedWrite { String auth = response.getAuth(); String targetUrl = String.format("http://%s/%s", url, fileId); - ByteString cipherKeyString = null; + ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; if (filerGrpcClient.isCipher()) { cipherKey = genCipherKey(); @@ -78,7 +78,7 @@ public class SeaweedWrite { HttpClient client = new DefaultHttpClient(); InputStream inputStream = null; - if (cipherKey == null) { + if (cipherKey == null || cipherKey.length == 0) { inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); } else { try { diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 999d14b8e..a790e4601 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -19,9 +19,11 @@ import ( func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { + glog.V(4).Infof("LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name)) + entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name)) if err == filer_pb.ErrNotFound { - return &filer_pb.LookupDirectoryEntryResponse{}, nil + return &filer_pb.LookupDirectoryEntryResponse{}, err } if err != nil { glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err) @@ -41,6 +43,8 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error { + glog.V(4).Infof("ListEntries %v", req) + limit := int(req.Limit) if limit == 0 { limit = fs.option.DirListingLimit @@ -135,6 +139,8 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { + glog.V(4).Infof("CreateEntry %v", req) + resp = &filer_pb.CreateEntryResponse{} chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) @@ -163,6 +169,8 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { + glog.V(4).Infof("UpdateEntry %v", req) + fullpath := util.Join(req.Directory, req.Entry.Name) entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath)) if err != nil { @@ -219,6 +227,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) { + glog.V(4).Infof("AppendToEntry %v", req) + fullpath := util.NewFullPath(req.Directory, req.EntryName) var offset int64 = 0 entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath)) @@ -250,6 +260,9 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) { + + glog.V(4).Infof("DeleteEntry %v", req) + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData) resp = &filer_pb.DeleteEntryResponse{} if err != nil { @@ -312,6 +325,8 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) { + glog.V(4).Infof("DeleteCollection %v", req) + err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: req.GetCollection(), @@ -353,12 +368,16 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { - return &filer_pb.GetFilerConfigurationResponse{ + t := &filer_pb.GetFilerConfigurationResponse{ Masters: fs.option.Masters, Collection: fs.option.Collection, Replication: fs.option.DefaultReplication, MaxMb: uint32(fs.option.MaxMB), DirBuckets: fs.filer.DirBucketsPath, Cipher: fs.filer.Cipher, - }, nil + } + + glog.V(4).Infof("GetFilerConfiguration: %v", t) + + return t, nil }