Browse Source

filer: compress stored metadata

pull/1449/head
Chris Lu 4 years ago
parent
commit
b8f32bcab9
  1. 8
      weed/filer/abstract_sql/abstract_sql_store.go
  2. 8
      weed/filer/cassandra/cassandra_store.go
  3. 12
      weed/filer/etcd/etcd_store.go
  4. 8
      weed/filer/leveldb/leveldb_store.go
  5. 9
      weed/filer/leveldb2/leveldb2_store.go
  6. 8
      weed/filer/mongodb/mongodb_store.go
  7. 6
      weed/filer/redis/universal_redis_store.go
  8. 6
      weed/filer/redis2/universal_redis_store.go
  9. 6
      weed/shell/command_fs_meta_cat.go
  10. 37
      weed/util/compression.go

8
weed/filer/abstract_sql/abstract_sql_store.go

@ -67,6 +67,10 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
if len(entry.Chunks) > 50 {
meta = util.MaybeGzipData(meta)
}
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
if err != nil {
if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
@ -126,7 +130,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full
entry := &filer.Entry{
FullPath: fullpath,
}
if err := entry.DecodeAttributesAndChunks(data); err != nil {
if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@ -188,7 +192,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
entry := &filer.Entry{
FullPath: util.NewFullPath(string(fullpath), name),
}
if err = entry.DecodeAttributesAndChunks(data); err != nil {
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}

8
weed/filer/cassandra/cassandra_store.go

@ -60,6 +60,10 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
if len(entry.Chunks) > 50 {
meta = util.MaybeGzipData(meta)
}
if err := store.session.Query(
"INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
dir, name, meta, entry.TtlSec).Exec(); err != nil {
@ -93,7 +97,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(data)
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@ -144,7 +148,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath
entry := &filer.Entry{
FullPath: util.NewFullPath(string(fullpath), name),
}
if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break

12
weed/filer/etcd/etcd_store.go

@ -76,12 +76,16 @@ func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
key := genKey(entry.DirAndName())
value, err := entry.EncodeAttributesAndChunks()
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
if len(entry.Chunks) > 50 {
meta = weed_util.MaybeGzipData(meta)
}
if _, err := store.client.Put(ctx, string(key), string(meta)); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
@ -107,7 +111,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPa
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(resp.Kvs[0].Value))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@ -163,7 +167,7 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break

8
weed/filer/leveldb/leveldb_store.go

@ -78,6 +78,10 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
value = weed_util.MaybeGzipData(value)
}
err = store.db.Put(key, value, nil)
if err != nil {
@ -109,7 +113,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(data)
err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData((data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@ -187,7 +191,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath we
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break

9
weed/filer/leveldb2/leveldb2_store.go

@ -85,6 +85,10 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
value = weed_util.MaybeGzipData(value)
}
err = store.dbs[partitionId].Put(key, value, nil)
if err != nil {
@ -117,7 +121,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.Fu
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(data)
err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@ -199,8 +203,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath w
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break

8
weed/filer/mongodb/mongodb_store.go

@ -101,6 +101,10 @@ func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
if len(entry.Chunks) > 50 {
meta = util.MaybeGzipData(meta)
}
c := store.connect.Database(store.database).Collection(store.collectionName)
_, err = c.InsertOne(ctx, Model{
@ -140,7 +144,7 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(data.Meta)
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@ -197,7 +201,7 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath ut
entry := &filer.Entry{
FullPath: util.NewFullPath(string(fullpath), data.Name),
}
if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil {
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break

6
weed/filer/redis/universal_redis_store.go

@ -40,6 +40,10 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
value = util.MaybeGzipData(value)
}
_, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
if err != nil {
@ -76,7 +80,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks([]byte(data))
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}

6
weed/filer/redis2/universal_redis_store.go

@ -38,6 +38,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
@ -71,7 +75,7 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks([]byte(data))
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}

6
weed/shell/command_fs_meta_cat.go

@ -70,8 +70,10 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
fmt.Fprintf(writer, "%s\n", text)
bytes, err := proto.Marshal(respLookupEntry.Entry)
fmt.Fprintf(writer, "chunks %d meta size: %d\n", len(respLookupEntry.Entry.Chunks), len(bytes))
bytes, _ := proto.Marshal(respLookupEntry.Entry)
gzippedBytes, _ := util.GzipData(bytes)
zstdBytes, _ := util.ZstdData(bytes)
fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes))
return nil

37
weed/util/compression.go

@ -12,15 +12,44 @@ import (
"github.com/klauspost/compress/zstd"
)
var(
UnsupportedCompression = fmt.Errorf("unsupported compression")
)
func MaybeGzipData(input []byte) ([]byte) {
if IsGzippedContent(input) {
return input
}
gzipped, err := GzipData(input)
if err != nil {
return input
}
if len(gzipped) * 10 > len(input) * 9 {
return input
}
return gzipped
}
func MaybeDecompressData(input []byte) ([]byte) {
uncompressed, err := DecompressData(input)
if err != nil {
if err != UnsupportedCompression {
glog.Errorf("decompressed data: %v", err)
}
return input
}
return uncompressed
}
func GzipData(input []byte) ([]byte, error) {
buf := new(bytes.Buffer)
w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed)
if _, err := w.Write(input); err != nil {
glog.V(2).Infoln("error compressing data:", err)
glog.V(2).Infof("error gzip data: %v", err)
return nil, err
}
if err := w.Close(); err != nil {
glog.V(2).Infoln("error closing compressed data:", err)
glog.V(2).Infof("error closing gzipped data: %v", err)
return nil, err
}
return buf.Bytes(), nil
@ -39,7 +68,7 @@ func DecompressData(input []byte) ([]byte, error) {
if IsZstdContent(input) {
return unzstdData(input)
}
return input, fmt.Errorf("unsupported compression")
return input, UnsupportedCompression
}
func ungzipData(input []byte) ([]byte, error) {
@ -48,7 +77,7 @@ func ungzipData(input []byte) ([]byte, error) {
defer r.Close()
output, err := ioutil.ReadAll(r)
if err != nil {
glog.V(2).Infoln("error uncompressing data:", err)
glog.V(2).Infof("error ungzip data: %v", err)
}
return output, err
}

Loading…
Cancel
Save