|
|
@ -32,6 +32,7 @@ type AbstractSqlStore struct { |
|
|
SupportBucketTable bool |
|
|
SupportBucketTable bool |
|
|
dbs map[string]bool |
|
|
dbs map[string]bool |
|
|
dbsLock sync.Mutex |
|
|
dbsLock sync.Mutex |
|
|
|
|
|
RetryableErrorCallback func(err error) bool |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
var _ filer.BucketAware = (*AbstractSqlStore)(nil) |
|
|
var _ filer.BucketAware = (*AbstractSqlStore)(nil) |
|
|
@ -151,63 +152,80 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full |
|
|
|
|
|
|
|
|
func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|
|
func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|
|
|
|
|
|
|
|
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("findDB %s : %v", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// define the work to be done
|
|
|
|
|
|
var doInsert func() error |
|
|
|
|
|
doInsert = func() error { |
|
|
|
|
|
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("findDB %s : %v", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
dir, name := shortPath.DirAndName() |
|
|
|
|
|
meta, err := entry.EncodeAttributesAndChunks() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("encode %s: %s", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
dir, name := shortPath.DirAndName() |
|
|
|
|
|
meta, err := entry.EncodeAttributesAndChunks() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("encode %s: %s", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { |
|
|
|
|
|
meta = util.MaybeGzipData(meta) |
|
|
|
|
|
} |
|
|
|
|
|
sqlInsert := "insert" |
|
|
|
|
|
res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta) |
|
|
|
|
|
if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate entry") { |
|
|
|
|
|
// now the insert failed possibly due to duplication constraints
|
|
|
|
|
|
sqlInsert = "falls back to update" |
|
|
|
|
|
glog.V(1).InfofCtx(ctx, "insert %s %s: %v", entry.FullPath, sqlInsert, err) |
|
|
|
|
|
res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) |
|
|
|
|
|
} |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("%s %s: %s", sqlInsert, entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { |
|
|
|
|
|
meta = util.MaybeGzipData(meta) |
|
|
|
|
|
} |
|
|
|
|
|
sqlInsert := "insert" |
|
|
|
|
|
res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta) |
|
|
|
|
|
if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate entry") { |
|
|
|
|
|
// now the insert failed possibly due to duplication constraints
|
|
|
|
|
|
sqlInsert = "falls back to update" |
|
|
|
|
|
glog.V(1).InfofCtx(ctx, "insert %s %s: %v", entry.FullPath, sqlInsert, err) |
|
|
|
|
|
res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) |
|
|
|
|
|
} |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("%s %s: %s", sqlInsert, entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
_, err = res.RowsAffected() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("%s %s but no rows affected: %s", sqlInsert, entry.FullPath, err) |
|
|
|
|
|
|
|
|
_, err = res.RowsAffected() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("%s %s but no rows affected: %s", sqlInsert, entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
|
|
|
|
if store.RetryableErrorCallback != nil { |
|
|
|
|
|
return util.RetryUntil("InsertEntry", doInsert, store.RetryableErrorCallback) |
|
|
|
|
|
} |
|
|
|
|
|
return doInsert() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|
|
func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|
|
|
|
|
|
|
|
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("findDB %s : %v", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
var doUpdate func() error |
|
|
|
|
|
doUpdate = func() error { |
|
|
|
|
|
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("findDB %s : %v", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
dir, name := shortPath.DirAndName() |
|
|
|
|
|
meta, err := entry.EncodeAttributesAndChunks() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("encode %s: %s", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
dir, name := shortPath.DirAndName() |
|
|
|
|
|
meta, err := entry.EncodeAttributesAndChunks() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("encode %s: %s", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("update %s: %s", entry.FullPath, err) |
|
|
|
|
|
|
|
|
res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("update %s: %s", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_, err = res.RowsAffected() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err) |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_, err = res.RowsAffected() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err) |
|
|
|
|
|
|
|
|
if store.RetryableErrorCallback != nil { |
|
|
|
|
|
return util.RetryUntil("UpdateEntry", doUpdate, store.RetryableErrorCallback) |
|
|
} |
|
|
} |
|
|
return nil |
|
|
|
|
|
|
|
|
return doUpdate() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { |
|
|
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { |
|
|
@ -240,56 +258,70 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full |
|
|
|
|
|
|
|
|
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { |
|
|
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { |
|
|
|
|
|
|
|
|
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("findDB %s : %v", fullpath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
var doDelete func() error |
|
|
|
|
|
doDelete = func() error { |
|
|
|
|
|
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("findDB %s : %v", fullpath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
dir, name := shortPath.DirAndName() |
|
|
|
|
|
|
|
|
dir, name := shortPath.DirAndName() |
|
|
|
|
|
|
|
|
res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("delete %s: %s", fullpath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("delete %s: %s", fullpath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
_, err = res.RowsAffected() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err) |
|
|
|
|
|
|
|
|
_, err = res.RowsAffected() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err) |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
|
|
|
|
if store.RetryableErrorCallback != nil { |
|
|
|
|
|
return util.RetryUntil("DeleteEntry", doDelete, store.RetryableErrorCallback) |
|
|
|
|
|
} |
|
|
|
|
|
return doDelete() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { |
|
|
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { |
|
|
|
|
|
|
|
|
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("findDB %s : %v", fullpath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
var doDeleteFolderChildren func() error |
|
|
|
|
|
doDeleteFolderChildren = func() error { |
|
|
|
|
|
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("findDB %s : %v", fullpath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if isValidBucket(bucket) && shortPath == "/" { |
|
|
|
|
|
if err = store.deleteTable(ctx, bucket); err == nil { |
|
|
|
|
|
store.dbsLock.Lock() |
|
|
|
|
|
delete(store.dbs, bucket) |
|
|
|
|
|
store.dbsLock.Unlock() |
|
|
|
|
|
return nil |
|
|
|
|
|
} else { |
|
|
|
|
|
return err |
|
|
|
|
|
|
|
|
if isValidBucket(bucket) && shortPath == "/" { |
|
|
|
|
|
if err = store.deleteTable(ctx, bucket); err == nil { |
|
|
|
|
|
store.dbsLock.Lock() |
|
|
|
|
|
delete(store.dbs, bucket) |
|
|
|
|
|
store.dbsLock.Unlock() |
|
|
|
|
|
return nil |
|
|
|
|
|
} else { |
|
|
|
|
|
return err |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
glog.V(4).InfofCtx(ctx, "delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath))) |
|
|
|
|
|
res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath)) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) |
|
|
|
|
|
|
|
|
glog.V(4).InfofCtx(ctx, "delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath))) |
|
|
|
|
|
res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath)) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_, err = res.RowsAffected() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_, err = res.RowsAffected() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) |
|
|
|
|
|
|
|
|
if store.RetryableErrorCallback != nil { |
|
|
|
|
|
return util.RetryUntil("DeleteFolderChildren", doDeleteFolderChildren, store.RetryableErrorCallback) |
|
|
} |
|
|
} |
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
return doDeleteFolderChildren() |
|
|
|
|
|
|
|
|
func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|
|
func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|
|
|
|
|
|
|
|
|