From 3e362451d226d9e19b4b652a02926dedc02f6cf9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 13 Nov 2020 12:10:55 -0800 Subject: [PATCH] add example of watch files --- .../java/seaweedfs/client/FilerClient.java | 15 +++++-- .../examples/UnzipFile.java} | 12 +++--- .../com/seaweedfs/examples/WatchFiles.java | 42 +++++++++++++++++++ 3 files changed, 59 insertions(+), 10 deletions(-) rename other/java/examples/src/main/java/com/{example/test/Example.java => seaweedfs/examples/UnzipFile.java} (83%) create mode 100644 other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 035b2c852..7338d5bee 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -275,9 +275,9 @@ public class FilerClient { try { FilerProto.CreateEntryResponse createEntryResponse = filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + .setDirectory(parent) + .setEntry(entry) + .build()); if (Strings.isNullOrEmpty(createEntryResponse.getError())) { return true; } @@ -333,4 +333,13 @@ public class FilerClient { return true; } + public Iterator watch(String prefix, String clientName, long sinceNs) { + return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() + .setPathPrefix(prefix) + .setClientName(clientName) + .setSinceNs(sinceNs) + .build() + ); + } + } diff --git a/other/java/examples/src/main/java/com/example/test/Example.java b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java similarity index 83% rename from other/java/examples/src/main/java/com/example/test/Example.java rename to other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java index 3d22329a8..0529a5c73 100644 --- a/other/java/examples/src/main/java/com/example/test/Example.java +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java @@ -1,4 +1,4 @@ -package com.example.test; +package com.seaweedfs.examples; import seaweed.hdfs.SeaweedInputStream; import seaweedfs.client.FilerClient; @@ -10,22 +10,20 @@ import java.io.InputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; -public class Example { - - public static FilerClient filerClient = new FilerClient("localhost", 18888); - public static FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); +public class UnzipFile { public static void main(String[] args) throws IOException { + FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); + FilerClient filerClient = new FilerClient(filerGrpcClient); + long startTime = System.currentTimeMillis(); - // 本地模式,速度很快 parseZip("/Users/chris/tmp/test.zip"); long startTime2 = System.currentTimeMillis(); long localProcessTime = startTime2 - startTime; - // swfs读取,慢 SeaweedInputStream seaweedInputStream = new SeaweedInputStream( filerGrpcClient, new org.apache.hadoop.fs.FileSystem.Statistics(""), diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java b/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java new file mode 100644 index 000000000..c4f4c81b0 --- /dev/null +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java @@ -0,0 +1,42 @@ +package com.seaweedfs.examples; + +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; + +import java.io.IOException; +import java.util.Iterator; + +public class WatchFiles { + + public static void main(String[] args) throws IOException { + FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); + FilerClient filerClient = new FilerClient(filerGrpcClient); + + Iterator watch = filerClient.watch( + "/buckets", + "exampleClient", + System.currentTimeMillis() * 1000000L + ); + + while (watch.hasNext()) { + FilerProto.SubscribeMetadataResponse event = watch.next(); + FilerProto.EventNotification notification = event.getEventNotification(); + if (notification.getNewParentPath() != null) { + // move an entry to a new directory, possibly with a new name + if (notification.hasOldEntry() && notification.hasNewEntry()) { + System.out.println("move " + event.getDirectory() + "/" + notification.getOldEntry().getName() + " to " + notification.getNewParentPath() + "/" + notification.getNewEntry().getName()); + } else { + System.out.println("this should not happen."); + } + } else if (notification.hasNewEntry() && !notification.hasOldEntry()) { + System.out.println("create entry " + event.getDirectory() + "/" + notification.getNewEntry().getName()); + } else if (!notification.hasNewEntry() && notification.hasOldEntry()) { + System.out.println("delete entry " + event.getDirectory() + "/" + notification.getOldEntry().getName()); + } else if (notification.hasNewEntry() && notification.hasOldEntry()) { + System.out.println("updated entry " + event.getDirectory() + "/" + notification.getNewEntry().getName()); + } + } + + } +}