Browse Source

able to subscribe any topic from any point of time

pull/1298/head
Chris Lu 5 years ago
parent
commit
9e72e9e4b8
  1. 4
      weed/filer2/filer_notify.go
  2. 54
      weed/filer2/reader_at.go
  3. 12
      weed/filer2/stream.go
  4. 3
      weed/filesys/dir.go
  5. 45
      weed/messaging/broker/broker_grpc_server_subscribe.go
  6. 16
      weed/pb/filer_pb/filer_client.go
  7. 3
      weed/pb/filer_pb/filer_client_bfs.go
  8. 3
      weed/s3api/filer_util.go
  9. 3
      weed/server/webdav_server.go
  10. 3
      weed/shell/command_bucket_list.go
  11. 3
      weed/shell/command_fs_du.go
  12. 5
      weed/shell/command_fs_ls.go
  13. 6
      weed/shell/command_fs_tree.go

4
weed/filer2/filer_notify.go

@ -109,7 +109,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
}
// println("processing", hourMinuteEntry.FullPath)
chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks)
if err := readEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
if err := ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
break
@ -123,7 +123,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
return nil
}
func readEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error {
func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error {
for {
n, err := r.Read(sizeBuf)
if err != nil {

54
weed/filer2/reader_at.go

@ -27,34 +27,40 @@ type ChunkReadAt struct {
// var _ = io.ReaderAt(&ChunkReadAt{})
type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
return func(fileId string) (targetUrl string, err error) {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
vid := VolumeId(fileId)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
locations := resp.LocationsMap[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(0).Infof("failed to locate %s", fileId)
return fmt.Errorf("failed to locate %s", fileId)
}
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
return nil
})
return
}
}
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
lookupFileId: func(fileId string) (targetUrl string, err error) {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
vid := VolumeId(fileId)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
locations := resp.LocationsMap[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(0).Infof("failed to locate %s", fileId)
return fmt.Errorf("failed to locate %s", fileId)
}
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
return nil
})
return
},
lookupFileId: LookupFn(filerClient),
bufferOffset: -1,
chunkCache: chunkCache,
}

12
weed/filer2/stream.go

@ -81,7 +81,7 @@ type ChunkStreamReader struct {
bufferOffset int64
bufferPos int
chunkIndex int
lookupFileId func(fileId string) (targetUrl string, err error)
lookupFileId LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})
@ -98,6 +98,16 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [
}
}
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
return &ChunkStreamReader{
chunkViews: chunkViews,
lookupFileId: LookupFn(filerClient),
}
}
func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
for n < len(p) {
if c.isBufferEmpty() {

3
weed/filesys/dir.go

@ -263,7 +263,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
cacheTtl := 5 * time.Minute
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) {
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
inode := fullpath.AsInode()
if entry.IsDirectory {
@ -274,6 +274,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
ret = append(ret, dirent)
}
dir.wfs.cacheSet(fullpath, entry, cacheTtl)
return nil
}
if dir.wfs.option.AsyncMetaDataCaching {

45
weed/messaging/broker/broker_grpc_server_subscribe.go

@ -3,10 +3,12 @@ package broker
import (
"fmt"
"io"
"strings"
"time"
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
@ -57,6 +59,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
case messaging_pb.SubscriberMessage_InitMessage_LATEST:
case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
}
var processedTsNs int64
// how to process each message
// an error returned will end the subscription
@ -81,9 +84,18 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
return err
}
processedTsNs = logEntry.TsNs
return nil
}
if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
return err
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
@ -94,3 +106,36 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic)
return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
if dayEntry.Name == startDate {
if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 {
return nil
}
}
// println("processing", hourMinuteEntry.FullPath)
chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
defer chunkedFileReader.Close()
if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
return nil
}
return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
}
return nil
}, "", false, 24*60)
}, startDate, true, 366)
}

16
weed/pb/filer_pb/filer_client.go

@ -56,19 +56,21 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry
return
}
func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool)) (err error) {
type EachEntryFunciton func(entry *Entry, isLast bool) error
func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
}
func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) {
func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
}
func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) {
func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
@ -92,7 +94,9 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f
if recvErr != nil {
if recvErr == io.EOF {
if prevEntry != nil {
fn(prevEntry, true)
if err := fn(prevEntry, true); err != nil {
return err
}
}
break
} else {
@ -100,7 +104,9 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f
}
}
if prevEntry != nil {
fn(prevEntry, false)
if err := fn(prevEntry, false); err != nil {
return err
}
}
prevEntry = resp.Entry
}

3
weed/pb/filer_pb/filer_client_bfs.go

@ -45,7 +45,7 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) {
return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
fn(parentPath, entry)
@ -57,6 +57,7 @@ func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queu
jobQueueWg.Add(1)
queue.Enqueue(util.FullPath(subDir))
}
return nil
})
}

3
weed/s3api/filer_util.go

@ -23,8 +23,9 @@ func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chun
func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit uint32) (entries []*filer_pb.Entry, err error) {
err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) {
err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) error {
entries = append(entries, entry)
return nil
}, startFrom, inclusive, limit)
return

3
weed/server/webdav_server.go

@ -511,7 +511,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
dir, _ := util.FullPath(f.name).DirAndName()
err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) {
err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
fi := FileInfo{
size: int64(filer2.TotalSize(entry.GetChunks())),
name: entry.Name,
@ -525,6 +525,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
}
glog.V(4).Infof("entry: %v", fi.name)
ret = append(ret, &fi)
return nil
})
old := f.off

3
weed/shell/command_bucket_list.go

@ -45,12 +45,13 @@ func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("read buckets: %v", err)
}
err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) {
err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" {
fmt.Fprintf(writer, " %s\n", entry.Name)
} else {
fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", entry.Name, entry.Attributes.Replication)
}
return nil
}, "", false, math.MaxUint32)
if err != nil {
return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)

3
weed/shell/command_fs_du.go

@ -54,7 +54,7 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer
func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) {
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error {
var fileBlockCount, fileByteCount uint64
@ -78,6 +78,7 @@ func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir
if name != "" && !entry.IsDirectory {
fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", fileBlockCount, fileByteCount, dir, entry.Name)
}
return nil
})
return
}

5
weed/shell/command_fs_ls.go

@ -62,10 +62,10 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer
dir, name := util.FullPath(path).DirAndName()
entryCount := 0
err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error {
if !showHidden && strings.HasPrefix(entry.Name, ".") {
return
return nil
}
entryCount++
@ -100,6 +100,7 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer
fmt.Fprintf(writer, "%s\n", entry.Name)
}
return nil
})
if isLongFormat && err == nil {

6
weed/shell/command_fs_tree.go

@ -51,10 +51,10 @@ func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, d
prefix.addMarker(level)
err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) {
err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) error {
if level < 0 && name != "" {
if entry.Name != name {
return
return nil
}
}
@ -70,7 +70,7 @@ func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, d
} else {
fileCount++
}
return nil
})
return
}

Loading…
Cancel
Save