Chris Lu
3 years ago
6 changed files with 484 additions and 99 deletions
-
109other/java/client/src/main/java/seaweedfs/client/ReadChunks.java
-
90other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
-
123other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
-
23weed/filer/filechunks.go
-
119weed/filer/filechunks_read.go
-
119weed/filer/filechunks_read_test.go
@ -0,0 +1,109 @@ |
|||
package seaweedfs.client; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.Comparator; |
|||
import java.util.List; |
|||
|
|||
public class ReadChunks { |
|||
|
|||
public static List<SeaweedRead.VisibleInterval> readResolvedChunks(List<FilerProto.FileChunk> chunkList) throws IOException { |
|||
List<Point> points = new ArrayList<>(chunkList.size() * 2); |
|||
for (FilerProto.FileChunk chunk : chunkList) { |
|||
points.add(new Point(chunk.getOffset(), chunk, true)); |
|||
points.add(new Point(chunk.getOffset() + chunk.getSize(), chunk, false)); |
|||
} |
|||
Collections.sort(points, new Comparator<Point>() { |
|||
@Override |
|||
public int compare(Point a, Point b) { |
|||
int x = (int) (a.x - b.x); |
|||
if (a.x != b.x) { |
|||
return (int) (a.x - b.x); |
|||
} |
|||
if (a.ts != b.ts) { |
|||
return (int) (a.ts - b.ts); |
|||
} |
|||
if (!a.isStart) { |
|||
return -1; |
|||
} |
|||
return 1; |
|||
} |
|||
}); |
|||
|
|||
long prevX = 0; |
|||
List<SeaweedRead.VisibleInterval> visibles = new ArrayList<>(); |
|||
ArrayList<Point> queue = new ArrayList<>(); |
|||
for (Point point : points) { |
|||
if (point.isStart) { |
|||
if (queue.size() > 0) { |
|||
int lastIndex = queue.size() - 1; |
|||
Point lastPoint = queue.get(lastIndex); |
|||
if (point.x != prevX && lastPoint.ts < point.ts) { |
|||
addToVisibles(visibles, prevX, lastPoint, point); |
|||
prevX = point.x; |
|||
} |
|||
} |
|||
// insert into queue |
|||
for (int i = queue.size(); i >= 0; i--) { |
|||
if (i == 0 || queue.get(i - 1).ts <= point.ts) { |
|||
if (i == queue.size()) { |
|||
prevX = point.x; |
|||
} |
|||
queue.add(i, point); |
|||
break; |
|||
} |
|||
} |
|||
} else { |
|||
int lastIndex = queue.size() - 1; |
|||
int index = lastIndex; |
|||
Point startPoint = null; |
|||
for (; index >= 0; index--) { |
|||
startPoint = queue.get(index); |
|||
if (startPoint.ts == point.ts) { |
|||
queue.remove(index); |
|||
break; |
|||
} |
|||
} |
|||
if (index == lastIndex && startPoint != null) { |
|||
addToVisibles(visibles, prevX, startPoint, point); |
|||
prevX = point.x; |
|||
} |
|||
} |
|||
} |
|||
|
|||
return visibles; |
|||
|
|||
} |
|||
|
|||
private static void addToVisibles(List<SeaweedRead.VisibleInterval> visibles, long prevX, Point startPoint, Point point) { |
|||
if (prevX < point.x) { |
|||
FilerProto.FileChunk chunk = startPoint.chunk; |
|||
visibles.add(new SeaweedRead.VisibleInterval( |
|||
prevX, |
|||
point.x, |
|||
chunk.getFileId(), |
|||
chunk.getMtime(), |
|||
prevX - chunk.getOffset(), |
|||
chunk.getOffset() == prevX && chunk.getSize() == prevX - startPoint.x, |
|||
chunk.getCipherKey().toByteArray(), |
|||
chunk.getIsCompressed() |
|||
)); |
|||
} |
|||
} |
|||
|
|||
static class Point { |
|||
long x; |
|||
long ts; |
|||
FilerProto.FileChunk chunk; |
|||
boolean isStart; |
|||
|
|||
public Point(long x, FilerProto.FileChunk chunk, boolean isStart) { |
|||
this.x = x; |
|||
this.ts = chunk.getMtime(); |
|||
this.chunk = chunk; |
|||
this.isStart = isStart; |
|||
} |
|||
} |
|||
|
|||
} |
@ -0,0 +1,119 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"sort" |
|||
) |
|||
|
|||
func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) { |
|||
|
|||
var points []*Point |
|||
for _, chunk := range chunks { |
|||
points = append(points, &Point{ |
|||
x: chunk.Offset, |
|||
ts: chunk.Mtime, |
|||
chunk: chunk, |
|||
isStart: true, |
|||
}) |
|||
points = append(points, &Point{ |
|||
x: chunk.Offset + int64(chunk.Size), |
|||
ts: chunk.Mtime, |
|||
chunk: chunk, |
|||
isStart: false, |
|||
}) |
|||
} |
|||
sort.Slice(points, func(i, j int) bool { |
|||
if points[i].x != points[j].x { |
|||
return points[i].x < points[j].x |
|||
} |
|||
if points[i].ts != points[j].ts { |
|||
return points[i].ts < points[j].ts |
|||
} |
|||
if !points[i].isStart { |
|||
return true |
|||
} |
|||
return false |
|||
}) |
|||
|
|||
var prevX int64 |
|||
var queue []*Point |
|||
for _, point := range points { |
|||
if point.isStart { |
|||
if len(queue) > 0 { |
|||
lastIndex := len(queue) -1 |
|||
lastPoint := queue[lastIndex] |
|||
if point.x != prevX && lastPoint.ts < point.ts { |
|||
visibles = addToVisibles(visibles, prevX, lastPoint, point) |
|||
prevX = point.x |
|||
} |
|||
} |
|||
// insert into queue
|
|||
for i := len(queue); i >= 0; i-- { |
|||
if i == 0 || queue[i-1].ts <= point.ts { |
|||
if i == len(queue) { |
|||
prevX = point.x |
|||
} |
|||
queue = addToQueue(queue, i, point) |
|||
break |
|||
} |
|||
} |
|||
} else { |
|||
lastIndex := len(queue) - 1 |
|||
index := lastIndex |
|||
var startPoint *Point |
|||
for ; index >= 0; index-- { |
|||
startPoint = queue[index] |
|||
if startPoint.ts == point.ts { |
|||
queue = removeFromQueue(queue, index) |
|||
break |
|||
} |
|||
} |
|||
if index == lastIndex && startPoint != nil { |
|||
visibles = addToVisibles(visibles, prevX, startPoint, point) |
|||
prevX = point.x |
|||
} |
|||
} |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func removeFromQueue(queue []*Point, index int) []*Point { |
|||
for i := index; i < len(queue)-1; i++ { |
|||
queue[i] = queue[i+1] |
|||
} |
|||
queue = queue[:len(queue)-1] |
|||
return queue |
|||
} |
|||
|
|||
func addToQueue(queue []*Point, index int, point *Point) []*Point { |
|||
queue = append(queue, point) |
|||
for i := len(queue) - 1; i > index; i-- { |
|||
queue[i], queue[i-1] = queue[i-1], queue[i] |
|||
} |
|||
return queue |
|||
} |
|||
|
|||
func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval { |
|||
if prevX < point.x { |
|||
chunk := startPoint.chunk |
|||
visibles = append(visibles, VisibleInterval{ |
|||
start: prevX, |
|||
stop: point.x, |
|||
fileId: chunk.FileId, |
|||
modifiedTime: chunk.Mtime, |
|||
chunkOffset: prevX - chunk.Offset, |
|||
chunkSize: chunk.Size, |
|||
cipherKey: chunk.CipherKey, |
|||
isGzipped: chunk.IsCompressed, |
|||
}) |
|||
} |
|||
return visibles |
|||
} |
|||
|
|||
type Point struct { |
|||
x int64 |
|||
ts int64 |
|||
chunk *filer_pb.FileChunk |
|||
isStart bool |
|||
} |
@ -0,0 +1,119 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"math/rand" |
|||
"testing" |
|||
) |
|||
|
|||
func TestReadResolvedChunks(t *testing.T) { |
|||
|
|||
chunks := []*filer_pb.FileChunk{ |
|||
{ |
|||
FileId: "a", |
|||
Offset: 0, |
|||
Size: 100, |
|||
Mtime: 1, |
|||
}, |
|||
{ |
|||
FileId: "b", |
|||
Offset: 50, |
|||
Size: 100, |
|||
Mtime: 2, |
|||
}, |
|||
{ |
|||
FileId: "c", |
|||
Offset: 200, |
|||
Size: 50, |
|||
Mtime: 3, |
|||
}, |
|||
{ |
|||
FileId: "d", |
|||
Offset: 250, |
|||
Size: 50, |
|||
Mtime: 4, |
|||
}, |
|||
{ |
|||
FileId: "e", |
|||
Offset: 175, |
|||
Size: 100, |
|||
Mtime: 5, |
|||
}, |
|||
} |
|||
|
|||
visibles := readResolvedChunks(chunks) |
|||
|
|||
for _, visible := range visibles { |
|||
fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTime) |
|||
} |
|||
|
|||
} |
|||
|
|||
func TestRandomizedReadResolvedChunks(t *testing.T) { |
|||
|
|||
var limit int64 = 1024*1024 |
|||
array := make([]int64, limit) |
|||
var chunks []*filer_pb.FileChunk |
|||
for ts := int64(0); ts < 1024; ts++ { |
|||
x := rand.Int63n(limit) |
|||
y := rand.Int63n(limit) |
|||
size := x - y |
|||
if size < 0 { |
|||
size = -size |
|||
} |
|||
if size > 1024 { |
|||
size = 1024 |
|||
} |
|||
start := x |
|||
if start > y { |
|||
start = y |
|||
} |
|||
chunks = append(chunks, randomWrite(array, start, size, ts)) |
|||
} |
|||
|
|||
visibles := readResolvedChunks(chunks) |
|||
|
|||
for _, visible := range visibles { |
|||
for i := visible.start; i<visible.stop;i++{ |
|||
if array[i] != visible.modifiedTime { |
|||
t.Errorf("position %d expected ts %d actual ts %d", i, array[i], visible.modifiedTime) |
|||
} |
|||
} |
|||
} |
|||
|
|||
// fmt.Printf("visibles %d", len(visibles))
|
|||
|
|||
} |
|||
|
|||
func randomWrite(array []int64, start int64, size int64, ts int64) *filer_pb.FileChunk { |
|||
for i := start; i < start+size; i++ { |
|||
array[i] = ts |
|||
} |
|||
// fmt.Printf("write [%d,%d) %d\n", start, start+size, ts)
|
|||
return &filer_pb.FileChunk{ |
|||
FileId: "", |
|||
Offset: start, |
|||
Size: uint64(size), |
|||
Mtime: ts, |
|||
} |
|||
} |
|||
|
|||
func TestSequentialReadResolvedChunks(t *testing.T) { |
|||
|
|||
var chunkSize int64 = 1024*1024*2 |
|||
var chunks []*filer_pb.FileChunk |
|||
for ts := int64(0); ts < 13; ts++ { |
|||
chunks = append(chunks, &filer_pb.FileChunk{ |
|||
FileId: "", |
|||
Offset: chunkSize*ts, |
|||
Size: uint64(chunkSize), |
|||
Mtime: 1, |
|||
}) |
|||
} |
|||
|
|||
visibles := readResolvedChunks(chunks) |
|||
|
|||
fmt.Printf("visibles %d", len(visibles)) |
|||
|
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue