Browse Source

Merge pull request #1 from chrislusf/master

update from origin
pull/388/head
谢烟客 9 years ago
committed by GitHub
parent
commit
96f96995d9
  1. 21
      Dockerfile
  2. 3
      README.md
  3. 18
      docker/Dockerfile
  4. 23
      docker/docker-compose.yml
  5. 34
      docker/entrypoint.sh
  6. 11
      weed/command/compact.go
  7. 2
      weed/command/export.go
  8. 6
      weed/command/filer.go
  9. 4
      weed/command/server.go
  10. 2
      weed/filer/cassandra_store/cassandra_store.go
  11. 4
      weed/filer/embedded_filer/files_in_leveldb.go
  12. 6
      weed/filer/filer.go
  13. 67
      weed/filer/mysql_store/README.md
  14. 270
      weed/filer/mysql_store/mysql_store.go
  15. 30
      weed/filer/mysql_store/mysql_store_test.go
  16. 4
      weed/filer/redis_store/redis_store.go
  17. 42
      weed/server/filer_server.go
  18. 3
      weed/server/filer_server_handlers_read.go
  19. 220
      weed/server/filer_server_handlers_write.go
  20. 2
      weed/storage/compact_map.go
  21. 3
      weed/storage/volume.go
  22. 1
      weed/storage/volume_checking.go
  23. 234
      weed/storage/volume_vacuum.go
  24. 55
      weed/storage/volume_vacuum_test.go
  25. 17
      weed/topology/data_node.go
  26. 13
      weed/topology/volume_layout.go

21
Dockerfile

@ -1,21 +0,0 @@
FROM progrium/busybox
WORKDIR /opt/weed
RUN opkg-install curl
RUN echo tlsv1 >> ~/.curlrc
RUN \
curl -Lks https://bintray.com$(curl -Lk http://bintray.com/chrislusf/seaweedfs/seaweedfs/_latestVersion | grep linux_amd64.tar.gz | sed -n "/href/ s/.*href=['\"]\([^'\"]*\)['\"].*/\1/gp") | gunzip | tar -xf - -C /opt/weed/ && \
mkdir ./bin && mv weed_*/* ./bin && \
chmod +x ./bin/weed
EXPOSE 8080
EXPOSE 9333
VOLUME /data
ENV WEED_HOME /opt/weed
ENV PATH ${PATH}:${WEED_HOME}/bin
ENTRYPOINT ["weed"]

3
README.md

@ -53,7 +53,6 @@ SeaweedFS uses HTTP REST operations to write, read, delete. The responses are in
``` ```
> ./weed master > ./weed master
``` ```
### Start Volume Servers ### ### Start Volume Servers ###
@ -61,9 +60,9 @@ SeaweedFS uses HTTP REST operations to write, read, delete. The responses are in
``` ```
> weed volume -dir="/tmp/data1" -max=5 -mserver="localhost:9333" -port=8080 & > weed volume -dir="/tmp/data1" -max=5 -mserver="localhost:9333" -port=8080 &
> weed volume -dir="/tmp/data2" -max=10 -mserver="localhost:9333" -port=8081 & > weed volume -dir="/tmp/data2" -max=10 -mserver="localhost:9333" -port=8081 &
``` ```
### Write File ### ### Write File ###
To upload a file: first, send a HTTP POST, PUT, or GET request to `/dir/assign` to get an `fid` and a volume server url: To upload a file: first, send a HTTP POST, PUT, or GET request to `/dir/assign` to get an `fid` and a volume server url:

18
docker/Dockerfile

@ -0,0 +1,18 @@
FROM progrium/busybox
RUN opkg-install curl
RUN echo tlsv1 >> ~/.curlrc
RUN curl -Lks https://bintray.com$(curl -Lk http://bintray.com/chrislusf/seaweedfs/seaweedfs/_latestVersion | grep linux_amd64.tar.gz | sed -n "/href/ s/.*href=['\"]\([^'\"]*\)['\"].*/\1/gp") | gunzip | tar -xf - && \
mv go_*amd64/weed /usr/bin/ && \
rm -r go_*amd64
EXPOSE 8080
EXPOSE 9333
VOLUME /data
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

23
docker/docker-compose.yml

@ -0,0 +1,23 @@
version: '2'
services:
master:
image: chrislusf/seaweedfs
ports:
- 9333:9333
command: "master"
networks:
default:
aliases:
- seaweed_master
volume:
image: chrislusf/seaweedfs
ports:
- 8080:8080
command: 'volume -max=5 -mserver="master:9333" -port=8080'
depends_on:
- master
networks:
default:
aliases:
- seaweed_volume

34
docker/entrypoint.sh

@ -0,0 +1,34 @@
#!/bin/sh
case "$1" in
'master')
ARGS="-ip `hostname -i` -mdir /data"
# Is this instance linked with an other master? (Docker commandline "--link master1:master")
if [ -n "$MASTER_PORT_9333_TCP_ADDR" ] ; then
ARGS="$ARGS -peers=$MASTER_PORT_9333_TCP_ADDR:$MASTER_PORT_9333_TCP_PORT"
fi
exec /usr/bin/weed $@ $ARGS
;;
'volume')
ARGS="-ip `hostname -i` -dir /data"
# Is this instance linked with a master? (Docker commandline "--link master1:master")
if [ -n "$MASTER_PORT_9333_TCP_ADDR" ] ; then
ARGS="$ARGS -mserver=$MASTER_PORT_9333_TCP_ADDR:$MASTER_PORT_9333_TCP_PORT"
fi
exec /usr/bin/weed $@ $ARGS
;;
'server')
ARGS="-ip `hostname -i` -dir /data"
if [ -n "$MASTER_PORT_9333_TCP_ADDR" ] ; then
ARGS="$ARGS -master.peers=$MASTER_PORT_9333_TCP_ADDR:$MASTER_PORT_9333_TCP_PORT"
fi
exec /usr/bin/weed $@ $ARGS
;;
*)
exec /usr/bin/weed $@
;;
esac

11
weed/command/compact.go

@ -23,6 +23,7 @@ var (
compactVolumePath = cmdCompact.Flag.String("dir", ".", "data directory to store files") compactVolumePath = cmdCompact.Flag.String("dir", ".", "data directory to store files")
compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name") compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name")
compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.")
compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 or 1.")
) )
func runCompact(cmd *Command, args []string) bool { func runCompact(cmd *Command, args []string) bool {
@ -37,8 +38,14 @@ func runCompact(cmd *Command, args []string) bool {
if err != nil { if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err) glog.Fatalf("Load Volume [ERROR] %s\n", err)
} }
if err = v.Compact(); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
if *compactMethod == 0 {
if err = v.Compact(); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} else {
if err = v.Compact2(); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} }
return true return true

2
weed/command/export.go

@ -51,7 +51,7 @@ func init() {
var ( var (
output = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") output = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Mime}} {{.Id}} {{.Name}} {{.Ext}}") format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Mime}} {{.Id}} {{.Name}} {{.Ext}}")
newer = cmdExport.Flag.String("newer", "", "export only files newer than this time, default is all files. Must be specified in RFC3339 without timezone")
newer = cmdExport.Flag.String("newer", "", "export only files newer than this time, default is all files. Must be specified in RFC3339 without timezone, e.g. 2006-01-02T15:04:05")
tarOutputFile *tar.Writer tarOutputFile *tar.Writer
tarHeader tar.Header tarHeader tar.Header

6
weed/command/filer.go

@ -24,6 +24,8 @@ type FilerOptions struct {
dir *string dir *string
redirectOnRead *bool redirectOnRead *bool
disableDirListing *bool disableDirListing *bool
confFile *string
maxMB *int
secretKey *string secretKey *string
cassandra_server *string cassandra_server *string
cassandra_keyspace *string cassandra_keyspace *string
@ -42,6 +44,8 @@ func init() {
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.confFile = cmdFiler.Flag.String("confFile", "", "json encoded filer conf file")
f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit")
f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
@ -82,6 +86,8 @@ func runFiler(cmd *Command, args []string) bool {
r := http.NewServeMux() r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing,
*f.confFile,
*f.maxMB,
*f.secretKey, *f.secretKey,
*f.cassandra_server, *f.cassandra_keyspace, *f.cassandra_server, *f.cassandra_keyspace,
*f.redis_server, *f.redis_password, *f.redis_database, *f.redis_server, *f.redis_password, *f.redis_database,

4
weed/command/server.go

@ -86,6 +86,8 @@ func init() {
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.confFile = cmdServer.Flag.String("filer.confFile", "", "json encoded filer conf file")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit")
filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server")
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
@ -169,6 +171,8 @@ func runServer(cmd *Command, args []string) bool {
_, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement, *filerOptions.defaultReplicaPlacement,
*filerOptions.redirectOnRead, *filerOptions.disableDirListing, *filerOptions.redirectOnRead, *filerOptions.disableDirListing,
*filerOptions.confFile,
*filerOptions.maxMB,
*filerOptions.secretKey, *filerOptions.secretKey,
*filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace,
*filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database, *filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database,

2
weed/filer/cassandra_store/cassandra_store.go

@ -3,6 +3,7 @@ package cassandra_store
import ( import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/gocql/gocql" "github.com/gocql/gocql"
@ -59,6 +60,7 @@ func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
fullFileName).Consistency(gocql.One).Scan(&output); err != nil { fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
if err != gocql.ErrNotFound { if err != gocql.ErrNotFound {
glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err)
return "", filer.ErrNotFound
} }
} }
if len(output) == 0 { if len(output) == 0 {

4
weed/filer/embedded_filer/files_in_leveldb.go

@ -53,7 +53,9 @@ func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string
} }
func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) {
data, e := fl.db.Get(genKey(dirId, fileName), nil) data, e := fl.db.Get(genKey(dirId, fileName), nil)
if e != nil {
if e == leveldb.ErrNotFound {
return "", filer.ErrNotFound
} else if e != nil {
return "", e return "", e
} }
return string(data), nil return string(data), nil

6
weed/filer/filer.go

@ -1,5 +1,9 @@
package filer package filer
import (
"errors"
)
type FileId string //file id in SeaweedFS type FileId string //file id in SeaweedFS
type FileEntry struct { type FileEntry struct {
@ -26,3 +30,5 @@ type Filer interface {
DeleteDirectory(dirPath string, recursive bool) (err error) DeleteDirectory(dirPath string, recursive bool) (err error)
Move(fromPath string, toPath string) (err error) Move(fromPath string, toPath string) (err error)
} }
var ErrNotFound = errors.New("filer: no entry is found in filer store")

67
weed/filer/mysql_store/README.md

@ -0,0 +1,67 @@
#MySQL filer mapping store
## Schema format
Basically, uriPath and fid are the key elements stored in MySQL. In view of the optimization and user's usage,
adding primary key with integer type and involving createTime, updateTime, status fields should be somewhat meaningful.
Of course, you could customize the schema per your concretely circumstance freely.
<pre><code>
CREATE TABLE IF NOT EXISTS `filer_mapping` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
`fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
`createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
`updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
`remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
`status` tinyint(2) DEFAULT '1' COMMENT 'resource status',
PRIMARY KEY (`id`),
UNIQUE KEY `index_uriPath` (`uriPath`)
) DEFAULT CHARSET=utf8;
</code></pre>
The MySQL 's config params is not added into the weed command option as other stores(redis,cassandra). Instead,
We created a config file(json format) for them. TOML,YAML or XML also should be OK. But TOML and YAML need import thirdparty package
while XML is a little bit complex.
The sample config file's content is below:
<pre><code>
{
"mysql": [
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.1",
"Port": 3306,
"DataBase": "seaweedfs"
},
{
"User": "root",
"Password": "root",
"HostName": "127.0.0.2",
"Port": 3306,
"DataBase": "seaweedfs"
}
],
"IsSharding":true,
"ShardCount":1024
}
</code></pre>
The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data.
1. If one mysql instance is enough, just keep one instance in "mysql" field.
2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table sharding numbers using "ShardCount" field.
3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),and mark "IsSharding" with false value
4. If you prepare more than one mysql instance and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding will still be done implicitly

270
weed/filer/mysql_store/mysql_store.go

@ -0,0 +1,270 @@
package mysql_store
import (
"database/sql"
"fmt"
"hash/crc32"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
_ "github.com/go-sql-driver/mysql"
)
const (
sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
default_maxIdleConnections = 100
default_maxOpenConnections = 50
default_maxTableNums = 1024
tableName = "filer_mapping"
)
var (
_init_db sync.Once
_db_connections []*sql.DB
)
type MySqlConf struct {
User string
Password string
HostName string
Port int
DataBase string
MaxIdleConnections int
MaxOpenConnections int
}
type ShardingConf struct {
IsSharding bool `json:"isSharding"`
ShardCount int `json:"shardCount"`
}
type MySqlStore struct {
dbs []*sql.DB
isSharding bool
shardCount int
}
func getDbConnection(confs []MySqlConf) []*sql.DB {
_init_db.Do(func() {
for _, conf := range confs {
sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase)
var dbErr error
_db_connection, dbErr := sql.Open("mysql", sqlUrl)
if dbErr != nil {
_db_connection.Close()
_db_connection = nil
panic(dbErr)
}
var maxIdleConnections, maxOpenConnections int
if conf.MaxIdleConnections != 0 {
maxIdleConnections = conf.MaxIdleConnections
} else {
maxIdleConnections = default_maxIdleConnections
}
if conf.MaxOpenConnections != 0 {
maxOpenConnections = conf.MaxOpenConnections
} else {
maxOpenConnections = default_maxOpenConnections
}
_db_connection.SetMaxIdleConns(maxIdleConnections)
_db_connection.SetMaxOpenConns(maxOpenConnections)
_db_connections = append(_db_connections, _db_connection)
}
})
return _db_connections
}
func NewMysqlStore(confs []MySqlConf, isSharding bool, shardCount int) *MySqlStore {
ms := &MySqlStore{
dbs: getDbConnection(confs),
isSharding: isSharding,
shardCount: shardCount,
}
for _, db := range ms.dbs {
if !isSharding {
ms.shardCount = 1
} else {
if ms.shardCount == 0 {
ms.shardCount = default_maxTableNums
}
}
for i := 0; i < ms.shardCount; i++ {
if err := ms.createTables(db, tableName, i); err != nil {
fmt.Printf("create table failed %v", err)
}
}
}
return ms
}
func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
instance_offset = int(hash_value) % len(s.dbs)
table_postfix = int(hash_value) % s.shardCount
return
}
func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
instance_offset, table_postfix := s.hash(path)
instanceId = instance_offset
if s.isSharding {
tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
} else {
tableFullName = tableName
}
return
}
func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
}
fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
if err == sql.ErrNoRows {
//Could not found
err = filer.ErrNotFound
}
return fid, err
}
func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) {
var tableFullName string
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err)
}
var old_fid string
if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
} else {
if len(old_fid) == 0 {
err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
} else {
err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
}
}
return
}
func (s *MySqlStore) Delete(fullFilePath string) (err error) {
var fid string
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
}
if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
} else if fid == "" {
return nil
}
if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
} else {
return nil
}
}
func (s *MySqlStore) Close() {
for _, db := range s.dbs {
db.Close()
}
}
var createTable = `
CREATE TABLE IF NOT EXISTS %s (
id bigint(20) NOT NULL AUTO_INCREMENT,
uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
status tinyint(2) DEFAULT '1' COMMENT 'resource status',
PRIMARY KEY (id),
UNIQUE KEY index_uriPath (uriPath)
) DEFAULT CHARSET=utf8;
`
func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error {
var realTableName string
if s.isSharding {
realTableName = fmt.Sprintf("%s_%4d", tableName, postfix)
} else {
realTableName = tableName
}
stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName))
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
return err
}
return nil
}
func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
sqlStatement := "SELECT fid FROM %s WHERE uriPath=?"
row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath)
var fid string
err := row.Scan(&fid)
if err != nil {
return "", err
}
return fid, nil
}
func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?"
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath)
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}
func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)"
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix())
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}
func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error {
sqlStatement := "DELETE FROM %s WHERE uriPath=?"
res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath)
if err != nil {
return err
}
_, err = res.RowsAffected()
if err != nil {
return err
}
return nil
}

30
weed/filer/mysql_store/mysql_store_test.go

@ -0,0 +1,30 @@
package mysql_store
import (
"encoding/json"
"hash/crc32"
"testing"
)
func TestGenerateMysqlConf(t *testing.T) {
var conf []MySqlConf
conf = append(conf, MySqlConf{
User: "root",
Password: "root",
HostName: "localhost",
Port: 3306,
DataBase: "seaweedfs",
})
body, err := json.Marshal(conf)
if err != nil {
t.Errorf("json encoding err %s", err.Error())
}
t.Logf("json output is %s", string(body))
}
func TestCRC32FullPathName(t *testing.T) {
fullPathName := "/prod-bucket/law632191483895612493300-signed.pdf"
hash_value := crc32.ChecksumIEEE([]byte(fullPathName))
table_postfix := int(hash_value) % 1024
t.Logf("table postfix %d", table_postfix)
}

4
weed/filer/redis_store/redis_store.go

@ -1,6 +1,8 @@
package redis_store package redis_store
import ( import (
"github.com/chrislusf/seaweedfs/weed/filer"
redis "gopkg.in/redis.v2" redis "gopkg.in/redis.v2"
) )
@ -20,7 +22,7 @@ func NewRedisStore(hostPort string, password string, database int) *RedisStore {
func (s *RedisStore) Get(fullFileName string) (fid string, err error) { func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result() fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil { if err == redis.Nil {
err = nil
err = filer.ErrNotFound
} }
return fid, err return fid, err
} }

42
weed/server/filer_server.go

@ -1,8 +1,10 @@
package weed_server package weed_server
import ( import (
"encoding/json"
"math/rand" "math/rand"
"net/http" "net/http"
"os"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -11,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer/cassandra_store" "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
"github.com/chrislusf/seaweedfs/weed/filer/embedded_filer" "github.com/chrislusf/seaweedfs/weed/filer/embedded_filer"
"github.com/chrislusf/seaweedfs/weed/filer/flat_namespace" "github.com/chrislusf/seaweedfs/weed/filer/flat_namespace"
"github.com/chrislusf/seaweedfs/weed/filer/mysql_store"
"github.com/chrislusf/seaweedfs/weed/filer/redis_store" "github.com/chrislusf/seaweedfs/weed/filer/redis_store"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
@ -18,6 +21,26 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
type filerConf struct {
MysqlConf []mysql_store.MySqlConf `json:"mysql"`
mysql_store.ShardingConf
}
func parseConfFile(confPath string) (*filerConf, error) {
var setting filerConf
configFile, err := os.Open(confPath)
defer configFile.Close()
if err != nil {
return nil, err
}
jsonParser := json.NewDecoder(configFile)
if err = jsonParser.Decode(&setting); err != nil {
return nil, err
}
return &setting, nil
}
type FilerServer struct { type FilerServer struct {
port string port string
master string master string
@ -28,11 +51,14 @@ type FilerServer struct {
disableDirListing bool disableDirListing bool
secret security.Secret secret security.Secret
filer filer.Filer filer filer.Filer
maxMB int
masterNodes *storage.MasterNodes masterNodes *storage.MasterNodes
} }
func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string,
replication string, redirectOnRead bool, disableDirListing bool, replication string, redirectOnRead bool, disableDirListing bool,
confFile string,
maxMB int,
secret string, secret string,
cassandra_server string, cassandra_keyspace string, cassandra_server string, cassandra_keyspace string,
redis_server string, redis_password string, redis_database int, redis_server string, redis_password string, redis_database int,
@ -43,10 +69,24 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
defaultReplication: replication, defaultReplication: replication,
redirectOnRead: redirectOnRead, redirectOnRead: redirectOnRead,
disableDirListing: disableDirListing, disableDirListing: disableDirListing,
maxMB: maxMB,
port: ip + ":" + strconv.Itoa(port), port: ip + ":" + strconv.Itoa(port),
} }
if cassandra_server != "" {
var setting *filerConf
if confFile != "" {
setting, err = parseConfFile(confFile)
if err != nil {
return nil, err
}
} else {
setting = new(filerConf)
}
if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
} else if cassandra_server != "" {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
if err != nil { if err != nil {
glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err) glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)

3
weed/server/filer_server_handlers_read.go

@ -7,6 +7,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui" ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui"
@ -87,7 +88,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
} }
fileId, err := fs.filer.FindFile(r.URL.Path) fileId, err := fs.filer.FindFile(r.URL.Path)
if err == leveldb.ErrNotFound {
if err == filer.ErrNotFound {
glog.V(3).Infoln("Not found in db", r.URL.Path) glog.V(3).Infoln("Not found in db", r.URL.Path)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return

220
weed/server/filer_server_handlers_write.go

@ -15,11 +15,13 @@ import (
"net/url" "net/url"
"strings" "strings"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"path"
"strconv"
) )
type FilerPostResult struct { type FilerPostResult struct {
@ -71,17 +73,17 @@ func makeFormData(filename, mimeType string, content io.Reader) (formData io.Rea
} }
func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) { func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound {
if fileId, err = fs.filer.FindFile(path); err != nil && err != filer.ErrNotFound {
glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return
} else if fileId != "" && err == nil { } else if fileId != "" && err == nil {
urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId) urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId)
if err != nil { if err != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return
} }
} else if fileId == "" && err == filer.ErrNotFound {
w.WriteHeader(http.StatusNotFound)
} }
return return
} }
@ -217,6 +219,7 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R
} }
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query() query := r.URL.Query()
replication := query.Get("replication") replication := query.Get("replication")
if replication == "" { if replication == "" {
@ -227,6 +230,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
collection = fs.collection collection = fs.collection
} }
if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked {
return
}
var fileId, urlLocation string var fileId, urlLocation string
var err error var err error
@ -243,7 +250,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
u, _ := url.Parse(urlLocation) u, _ := url.Parse(urlLocation)
// This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off
// because they need to provide FIDs instead of file paths...
cm, _ := strconv.ParseBool(query.Get("cm"))
if cm {
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
}
glog.V(4).Infoln("post to", u) glog.V(4).Infoln("post to", u)
request := &http.Request{ request := &http.Request{
Method: r.Method, Method: r.Method,
URL: u, URL: u,
@ -298,6 +315,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" { if r.Method != "PUT" {
if oldFid, err := fs.filer.FindFile(path); err == nil { if oldFid, err := fs.filer.FindFile(path); err == nil {
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
} }
} }
@ -319,6 +338,199 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, http.StatusCreated, reply) writeJsonQuiet(w, r, http.StatusCreated, reply)
} }
func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool {
if r.Method != "POST" {
glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
return false
}
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r.URL.Query()
parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32)
maxMB := int32(parsedMaxMB)
if maxMB <= 0 && fs.maxMB > 0 {
maxMB = int32(fs.maxMB)
}
if maxMB <= 0 {
glog.V(4).Infoln("AutoChunking not enabled")
return false
}
glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
chunkSize := 1024 * 1024 * maxMB
contentLength := int64(0)
if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64)
if contentLength <= int64(chunkSize) {
glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.")
return false
}
}
if contentLength <= 0 {
glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.")
return false
}
reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
writeJsonQuiet(w, r, http.StatusCreated, reply)
}
return true
}
func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) {
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
return nil, multipartReaderErr
}
part1, part1Err := multipartReader.NextPart()
if part1Err != nil {
return nil, part1Err
}
fileName := part1.FileName()
if fileName != "" {
fileName = path.Base(fileName)
}
chunks := (int64(contentLength) / int64(chunkSize)) + 1
cm := operation.ChunkManifest{
Name: fileName,
Size: 0, // don't know yet
Mime: "application/octet-stream",
Chunks: make([]*operation.ChunkInfo, 0, chunks),
}
totalBytesRead := int64(0)
tmpBufferSize := int32(1024 * 1024)
tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize))
chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow
chunkBufOffset := int32(0)
chunkOffset := int64(0)
writtenChunks := 0
filerResult = &FilerPostResult{
Name: fileName,
}
for totalBytesRead < contentLength {
tmpBuffer.Reset()
bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize))
readFully := readErr != nil && readErr == io.EOF
tmpBuf := tmpBuffer.Bytes()
bytesToCopy := tmpBuf[0:int(bytesRead)]
copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy)
chunkBufOffset = chunkBufOffset + int32(bytesRead)
if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) {
writtenChunks = writtenChunks + 1
fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection)
if assignErr != nil {
return nil, assignErr
}
// upload the chunk to the volume server
chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10)
uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId)
if uploadErr != nil {
return nil, uploadErr
}
// Save to chunk manifest structure
cm.Chunks = append(cm.Chunks,
&operation.ChunkInfo{
Offset: chunkOffset,
Size: int64(chunkBufOffset),
Fid: fileId,
},
)
// reset variables for the next chunk
chunkBufOffset = 0
chunkOffset = totalBytesRead + int64(bytesRead)
}
totalBytesRead = totalBytesRead + int64(bytesRead)
if bytesRead == 0 || readFully {
break
}
if readErr != nil {
return nil, readErr
}
}
cm.Size = totalBytesRead
manifestBuf, marshalErr := cm.Marshal()
if marshalErr != nil {
return nil, marshalErr
}
manifestStr := string(manifestBuf)
glog.V(4).Infoln("Generated chunk manifest: ", manifestStr)
manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection)
if manifestAssignmentErr != nil {
return nil, manifestAssignmentErr
}
glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId)
filerResult.Fid = manifestFileId
u, _ := url.Parse(manifestUrlLocation)
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId)
if manifestUploadErr != nil {
return nil, manifestUploadErr
}
path := r.URL.Path
// also delete the old fid unless PUT operation
if r.Method != "PUT" {
if oldFid, err := fs.filer.FindFile(path); err == nil {
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
}
}
glog.V(4).Infoln("saving", path, "=>", manifestFileId)
if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil {
replyerr = db_err
filerResult.Error = db_err.Error()
operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
return
}
return
}
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) {
err = nil
ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId))
if uploadResult != nil {
glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
}
if uploadError != nil {
err = uploadError
}
return
}
// curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to?recursive=true // curl -X DELETE http://localhost:8888/path/to?recursive=true
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {

2
weed/storage/compact_map.go

@ -41,10 +41,10 @@ func NewCompactSection(start Key) *CompactSection {
//return old entry size //return old entry size
func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 { func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 {
ret := uint32(0) ret := uint32(0)
cs.Lock()
if key > cs.end { if key > cs.end {
cs.end = key cs.end = key
} }
cs.Lock()
if i := cs.binarySearchValues(key); i >= 0 { if i := cs.binarySearchValues(key); i >= 0 {
ret = cs.values[i].Size ret = cs.values[i].Size
//println("key", key, "old size", ret) //println("key", key, "old size", ret)

3
weed/storage/volume.go

@ -23,6 +23,9 @@ type Volume struct {
dataFileAccessLock sync.Mutex dataFileAccessLock sync.Mutex
lastModifiedTime uint64 //unix time in seconds lastModifiedTime uint64 //unix time in seconds
lastCompactIndexOffset uint64
lastCompactRevision uint16
} }
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {

1
weed/storage/volume_checking.go

@ -21,7 +21,6 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
} }
key, offset, size := idxFileEntry(lastIdxEntry) key, offset, size := idxFileEntry(lastIdxEntry)
//deleted index entry could not point to deleted needle
if offset == 0 { if offset == 0 {
return nil return nil
} }

234
weed/storage/volume_vacuum.go

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func (v *Volume) garbageLevel() float64 { func (v *Volume) garbageLevel() float64 {
@ -20,9 +21,19 @@ func (v *Volume) Compact() error {
//glog.V(3).Infof("Got Compaction lock...") //glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName() filePath := v.FileName()
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
v.lastCompactIndexOffset = v.nm.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
} }
func (v *Volume) Compact2() error {
glog.V(3).Infof("Compact2 ...")
filePath := v.FileName()
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx")
}
func (v *Volume) commitCompact() error { func (v *Volume) commitCompact() error {
glog.V(3).Infof("Committing vacuuming...") glog.V(3).Infof("Committing vacuuming...")
v.dataFileAccessLock.Lock() v.dataFileAccessLock.Lock()
@ -30,13 +41,28 @@ func (v *Volume) commitCompact() error {
glog.V(3).Infof("Got Committing lock...") glog.V(3).Infof("Got Committing lock...")
v.nm.Close() v.nm.Close()
_ = v.dataFile.Close() _ = v.dataFile.Close()
var e error var e error
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
return e
}
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
return e
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
glog.V(0).Infof("makeupDiff in commitCompact failed %v", e)
e = os.Remove(v.FileName() + ".cpd")
if e != nil {
return e
}
e = os.Remove(v.FileName() + ".cpx")
if e != nil {
return e
}
} else {
var e error
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
return e
}
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
return e
}
} }
//glog.V(3).Infof("Pretending to be vacuuming...") //glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second) //time.Sleep(20 * time.Second)
glog.V(3).Infof("Loading Commit file...") glog.V(3).Infof("Loading Commit file...")
@ -46,6 +72,141 @@ func (v *Volume) commitCompact() error {
return nil return nil
} }
func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) {
if _, err = file.Seek(0, 0); err != nil {
return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
}
header := make([]byte, SuperBlockSize)
if _, e := file.Read(header); e != nil {
return 0, fmt.Errorf("cannot read file %s 's super block: %v", file.Name(), e)
}
superBlock, err := ParseSuperBlock(header)
if err != nil {
return 0, err
}
return superBlock.CompactRevision, nil
}
func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) {
var indexSize int64
oldIdxFile, err := os.Open(oldIdxFileName)
defer oldIdxFile.Close()
oldDatFile, err := os.Open(oldDatFileName)
defer oldDatFile.Close()
if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil {
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
}
if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset {
return nil
}
oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile)
if err != nil {
return
}
if oldDatCompactRevision != v.lastCompactRevision {
return fmt.Errorf("current old dat file's compact revision %d is not the expected one %d", oldDatCompactRevision, v.lastCompactRevision)
}
type keyField struct {
offset uint32
size uint32
}
incrementedHasUpdatedIndexEntry := make(map[uint64]keyField)
for idx_offset := indexSize - NeedleIndexSize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleIndexSize {
var IdxEntry []byte
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil {
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err)
}
key, offset, size := idxFileEntry(IdxEntry)
if _, found := incrementedHasUpdatedIndexEntry[key]; !found {
incrementedHasUpdatedIndexEntry[key] = keyField{
offset: offset,
size: size,
}
}
}
if len(incrementedHasUpdatedIndexEntry) > 0 {
var (
dst, idx *os.File
)
if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil {
return
}
defer dst.Close()
if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil {
return
}
defer idx.Close()
var newDatCompactRevision uint16
newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst)
if err != nil {
return
}
if oldDatCompactRevision+1 != newDatCompactRevision {
return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision)
}
idx_entry_bytes := make([]byte, 16)
for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry {
util.Uint64toBytes(idx_entry_bytes[0:8], key)
util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset)
util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size)
var offset int64
if offset, err = dst.Seek(0, 2); err != nil {
glog.V(0).Infof("failed to seek the end of file: %v", err)
return
}
//ensure file writing starting from aligned positions
if offset%NeedlePaddingSize != 0 {
offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
if offset, err = v.dataFile.Seek(offset, 0); err != nil {
glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
return
}
}
//updated needle
if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 {
//even the needle cache in memory is hit, the need_bytes is correct
var needle_bytes []byte
needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
if err != nil {
return
}
dst.Write(needle_bytes)
util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize))
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
fakeDelNeedle := new(Needle)
fakeDelNeedle.Id = key
fakeDelNeedle.Cookie = 0x12345678
_, err = fakeDelNeedle.Append(dst, v.Version())
if err != nil {
return
}
util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0))
}
if _, err := idx.Seek(0, 2); err != nil {
return fmt.Errorf("cannot seek end of indexfile %s: %v",
newIdxFileName, err)
}
_, err = idx.Write(idx_entry_bytes)
}
}
return nil
}
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
var ( var (
dst, idx *os.File dst, idx *os.File
@ -91,3 +252,64 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
return return
} }
func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
var (
dst, idx, oldIndexFile *os.File
)
if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
}
defer dst.Close()
if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
}
defer idx.Close()
if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
return
}
defer oldIndexFile.Close()
nm := NewNeedleMap(idx)
now := uint64(time.Now().Unix())
v.SuperBlock.CompactRevision++
dst.Write(v.SuperBlock.Bytes())
new_offset := int64(SuperBlockSize)
WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error {
if size <= 0 {
return nil
}
nv, ok := v.nm.Get(key)
if !ok {
return nil
}
n := new(Needle)
n.ReadData(v.dataFile, int64(offset)*NeedlePaddingSize, size, v.Version())
defer n.ReleaseMemory()
if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
return nil
}
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if nv.Offset == offset && nv.Size > 0 {
if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, err = n.Append(dst, v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
new_offset += n.DiskSize()
glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
}
return nil
})
return
}

55
weed/storage/volume_vacuum_test.go

@ -0,0 +1,55 @@
package storage
import (
"testing"
)
/*
makediff test steps
1. launch weed server at your local/dev environment, (option
"garbageThreshold" for master and option "max" for volume should be set with specific value which would let
preparing test prerequisite easier )
a) ./weed master -garbageThreshold=0.99 -mdir=./m
b) ./weed volume -dir=./data -max=1 -mserver=localhost:9333 -port=8080
2. upload 4 different files, you could call dir/assign to get 4 different fids
a) upload file A with fid a
b) upload file B with fid b
c) upload file C with fid c
d) upload file D with fid d
3. update file A and C
a) modify file A and upload file A with fid a
b) modify file C and upload file C with fid c
c) record the current 1.idx's file size(lastCompactIndexOffset value)
4. Compacting the data file
a) run curl http://localhost:8080/admin/vacuum/compact?volumeId=1
b) verify the 1.cpd and 1.cpx is created under volume directory
5. update file B and delete file D
a) modify file B and upload file B with fid b
d) delete file B with fid b
6. Now you could run the following UT case, the case should be run successfully
7. Compact commit manually
a) mv 1.cpd 1.dat
b) mv 1.cpx 1.idx
8. Restart Volume Server
9. Now you should get updated file A,B,C
*/
func TestMakeDiff(t *testing.T) {
v := new(Volume)
//lastCompactIndexOffset value is the index file size before step 4
v.lastCompactIndexOffset = 96
v.SuperBlock.version = 0x2
/*
err := v.makeupDiff(
"/yourpath/1.cpd",
"/yourpath/1.cpx",
"/yourpath/1.dat",
"/yourpath/1.idx")
if err != nil {
t.Errorf("makeupDiff err is %v", err)
} else {
t.Log("makeupDiff Succeeded")
}
*/
}

17
weed/topology/data_node.go

@ -53,7 +53,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo
for _, v := range actualVolumes { for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v actualVolumeMap[v.Id] = v
} }
dn.RLock()
dn.Lock()
for vid, v := range dn.volumes { for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok { if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid) glog.V(0).Infoln("Deleting volume id:", vid)
@ -62,8 +62,8 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo
dn.UpAdjustVolumeCountDelta(-1) dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1)
} }
} //TODO: adjust max volume id, if need to reclaim volume ids
dn.RUnlock()
}
dn.Unlock()
for _, v := range actualVolumes { for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v) dn.AddOrUpdateVolume(v)
} }
@ -79,6 +79,17 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
return ret return ret
} }
func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) {
dn.RLock()
defer dn.RUnlock()
v_info, ok := dn.volumes[id]
if ok {
return v_info, nil
} else {
return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
}
}
func (dn *DataNode) GetDataCenter() *DataCenter { func (dn *DataNode) GetDataCenter() *DataCenter {
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
} }

13
weed/topology/volume_layout.go

@ -45,6 +45,19 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
} }
vl.vid2location[v.Id].Set(dn) vl.vid2location[v.Id].Set(dn)
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
for _, dn := range vl.vid2location[v.Id].list {
if v_info, err := dn.GetVolumesById(v.Id); err == nil {
if v_info.ReadOnly {
glog.V(3).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
return
}
} else {
glog.V(3).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
return
}
}
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
if _, ok := vl.oversizedVolumes[v.Id]; !ok { if _, ok := vl.oversizedVolumes[v.Id]; !ok {
vl.addToWritable(v.Id) vl.addToWritable(v.Id)

Loading…
Cancel
Save