Browse Source

add example of watch files

logrus
Chris Lu 4 years ago
parent
commit
3e362451d2
  1. 15
      other/java/client/src/main/java/seaweedfs/client/FilerClient.java
  2. 12
      other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
  3. 42
      other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java

15
other/java/client/src/main/java/seaweedfs/client/FilerClient.java

@ -275,9 +275,9 @@ public class FilerClient {
try { try {
FilerProto.CreateEntryResponse createEntryResponse = FilerProto.CreateEntryResponse createEntryResponse =
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parent)
.setEntry(entry)
.build());
.setDirectory(parent)
.setEntry(entry)
.build());
if (Strings.isNullOrEmpty(createEntryResponse.getError())) { if (Strings.isNullOrEmpty(createEntryResponse.getError())) {
return true; return true;
} }
@ -333,4 +333,13 @@ public class FilerClient {
return true; return true;
} }
public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) {
return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder()
.setPathPrefix(prefix)
.setClientName(clientName)
.setSinceNs(sinceNs)
.build()
);
}
} }

12
other/java/examples/src/main/java/com/example/test/Example.java → other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java

@ -1,4 +1,4 @@
package com.example.test;
package com.seaweedfs.examples;
import seaweed.hdfs.SeaweedInputStream; import seaweed.hdfs.SeaweedInputStream;
import seaweedfs.client.FilerClient; import seaweedfs.client.FilerClient;
@ -10,22 +10,20 @@ import java.io.InputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream; import java.util.zip.ZipInputStream;
public class Example {
public static FilerClient filerClient = new FilerClient("localhost", 18888);
public static FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
public class UnzipFile {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
FilerClient filerClient = new FilerClient(filerGrpcClient);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
// 本地模式速度很快
parseZip("/Users/chris/tmp/test.zip"); parseZip("/Users/chris/tmp/test.zip");
long startTime2 = System.currentTimeMillis(); long startTime2 = System.currentTimeMillis();
long localProcessTime = startTime2 - startTime; long localProcessTime = startTime2 - startTime;
// swfs读取
SeaweedInputStream seaweedInputStream = new SeaweedInputStream( SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
filerGrpcClient, filerGrpcClient,
new org.apache.hadoop.fs.FileSystem.Statistics(""), new org.apache.hadoop.fs.FileSystem.Statistics(""),

42
other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java

@ -0,0 +1,42 @@
package com.seaweedfs.examples;
import seaweedfs.client.FilerClient;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import java.io.IOException;
import java.util.Iterator;
public class WatchFiles {
public static void main(String[] args) throws IOException {
FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
FilerClient filerClient = new FilerClient(filerGrpcClient);
Iterator<FilerProto.SubscribeMetadataResponse> watch = filerClient.watch(
"/buckets",
"exampleClient",
System.currentTimeMillis() * 1000000L
);
while (watch.hasNext()) {
FilerProto.SubscribeMetadataResponse event = watch.next();
FilerProto.EventNotification notification = event.getEventNotification();
if (notification.getNewParentPath() != null) {
// move an entry to a new directory, possibly with a new name
if (notification.hasOldEntry() && notification.hasNewEntry()) {
System.out.println("move " + event.getDirectory() + "/" + notification.getOldEntry().getName() + " to " + notification.getNewParentPath() + "/" + notification.getNewEntry().getName());
} else {
System.out.println("this should not happen.");
}
} else if (notification.hasNewEntry() && !notification.hasOldEntry()) {
System.out.println("create entry " + event.getDirectory() + "/" + notification.getNewEntry().getName());
} else if (!notification.hasNewEntry() && notification.hasOldEntry()) {
System.out.println("delete entry " + event.getDirectory() + "/" + notification.getOldEntry().getName());
} else if (notification.hasNewEntry() && notification.hasOldEntry()) {
System.out.println("updated entry " + event.getDirectory() + "/" + notification.getNewEntry().getName());
}
}
}
}
Loading…
Cancel
Save