diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index b2291c15f..000000000 --- a/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -FROM progrium/busybox - -WORKDIR /opt/weed - -RUN opkg-install curl -RUN echo tlsv1 >> ~/.curlrc - -RUN \ - curl -Lks https://bintray.com$(curl -Lk http://bintray.com/chrislusf/seaweedfs/seaweedfs/_latestVersion | grep linux_amd64.tar.gz | sed -n "/href/ s/.*href=['\"]\([^'\"]*\)['\"].*/\1/gp") | gunzip | tar -xf - -C /opt/weed/ && \ - mkdir ./bin && mv weed_*/* ./bin && \ - chmod +x ./bin/weed - -EXPOSE 8080 -EXPOSE 9333 - -VOLUME /data - -ENV WEED_HOME /opt/weed -ENV PATH ${PATH}:${WEED_HOME}/bin - -ENTRYPOINT ["weed"] diff --git a/README.md b/README.md index 3582bdb32..41b8f6be3 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,6 @@ SeaweedFS uses HTTP REST operations to write, read, delete. The responses are in ``` > ./weed master - ``` ### Start Volume Servers ### @@ -61,9 +60,9 @@ SeaweedFS uses HTTP REST operations to write, read, delete. The responses are in ``` > weed volume -dir="/tmp/data1" -max=5 -mserver="localhost:9333" -port=8080 & > weed volume -dir="/tmp/data2" -max=10 -mserver="localhost:9333" -port=8081 & - ``` + ### Write File ### To upload a file: first, send a HTTP POST, PUT, or GET request to `/dir/assign` to get an `fid` and a volume server url: diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 000000000..21e5a7b47 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,18 @@ +FROM progrium/busybox + +RUN opkg-install curl +RUN echo tlsv1 >> ~/.curlrc + +RUN curl -Lks https://bintray.com$(curl -Lk http://bintray.com/chrislusf/seaweedfs/seaweedfs/_latestVersion | grep linux_amd64.tar.gz | sed -n "/href/ s/.*href=['\"]\([^'\"]*\)['\"].*/\1/gp") | gunzip | tar -xf - && \ + mv go_*amd64/weed /usr/bin/ && \ + rm -r go_*amd64 + +EXPOSE 8080 +EXPOSE 9333 + +VOLUME /data + +COPY entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 000000000..3ccf596d5 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,23 @@ +version: '2' + +services: + master: + image: chrislusf/seaweedfs + ports: + - 9333:9333 + command: "master" + networks: + default: + aliases: + - seaweed_master + volume: + image: chrislusf/seaweedfs + ports: + - 8080:8080 + command: 'volume -max=5 -mserver="master:9333" -port=8080' + depends_on: + - master + networks: + default: + aliases: + - seaweed_volume \ No newline at end of file diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh new file mode 100755 index 000000000..34ab61148 --- /dev/null +++ b/docker/entrypoint.sh @@ -0,0 +1,34 @@ +#!/bin/sh + +case "$1" in + + 'master') + ARGS="-ip `hostname -i` -mdir /data" + # Is this instance linked with an other master? (Docker commandline "--link master1:master") + if [ -n "$MASTER_PORT_9333_TCP_ADDR" ] ; then + ARGS="$ARGS -peers=$MASTER_PORT_9333_TCP_ADDR:$MASTER_PORT_9333_TCP_PORT" + fi + exec /usr/bin/weed $@ $ARGS + ;; + + 'volume') + ARGS="-ip `hostname -i` -dir /data" + # Is this instance linked with a master? (Docker commandline "--link master1:master") + if [ -n "$MASTER_PORT_9333_TCP_ADDR" ] ; then + ARGS="$ARGS -mserver=$MASTER_PORT_9333_TCP_ADDR:$MASTER_PORT_9333_TCP_PORT" + fi + exec /usr/bin/weed $@ $ARGS + ;; + + 'server') + ARGS="-ip `hostname -i` -dir /data" + if [ -n "$MASTER_PORT_9333_TCP_ADDR" ] ; then + ARGS="$ARGS -master.peers=$MASTER_PORT_9333_TCP_ADDR:$MASTER_PORT_9333_TCP_PORT" + fi + exec /usr/bin/weed $@ $ARGS + ;; + + *) + exec /usr/bin/weed $@ + ;; +esac diff --git a/weed/command/compact.go b/weed/command/compact.go index ba2fbf867..db11880ec 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -23,6 +23,7 @@ var ( compactVolumePath = cmdCompact.Flag.String("dir", ".", "data directory to store files") compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name") compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") + compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 or 1.") ) func runCompact(cmd *Command, args []string) bool { @@ -37,8 +38,14 @@ func runCompact(cmd *Command, args []string) bool { if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } - if err = v.Compact(); err != nil { - glog.Fatalf("Compact Volume [ERROR] %s\n", err) + if *compactMethod == 0 { + if err = v.Compact(); err != nil { + glog.Fatalf("Compact Volume [ERROR] %s\n", err) + } + } else { + if err = v.Compact2(); err != nil { + glog.Fatalf("Compact Volume [ERROR] %s\n", err) + } } return true diff --git a/weed/command/export.go b/weed/command/export.go index 481aa111b..5a7dc71d9 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -51,7 +51,7 @@ func init() { var ( output = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Mime}} {{.Id}} {{.Name}} {{.Ext}}") - newer = cmdExport.Flag.String("newer", "", "export only files newer than this time, default is all files. Must be specified in RFC3339 without timezone") + newer = cmdExport.Flag.String("newer", "", "export only files newer than this time, default is all files. Must be specified in RFC3339 without timezone, e.g. 2006-01-02T15:04:05") tarOutputFile *tar.Writer tarHeader tar.Header diff --git a/weed/command/filer.go b/weed/command/filer.go index 582d4e9c8..7d90707a6 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -24,6 +24,8 @@ type FilerOptions struct { dir *string redirectOnRead *bool disableDirListing *bool + confFile *string + maxMB *int secretKey *string cassandra_server *string cassandra_keyspace *string @@ -42,6 +44,8 @@ func init() { f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") + f.confFile = cmdFiler.Flag.String("confFile", "", "json encoded filer conf file") + f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit") f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -82,6 +86,8 @@ func runFiler(cmd *Command, args []string) bool { r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, + *f.confFile, + *f.maxMB, *f.secretKey, *f.cassandra_server, *f.cassandra_keyspace, *f.redis_server, *f.redis_password, *f.redis_database, diff --git a/weed/command/server.go b/weed/command/server.go index 1211c7137..eed7dcae4 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -86,6 +86,8 @@ func init() { filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") + filerOptions.confFile = cmdServer.Flag.String("filer.confFile", "", "json encoded filer conf file") + filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit") filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -169,6 +171,8 @@ func runServer(cmd *Command, args []string) bool { _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, *filerOptions.disableDirListing, + *filerOptions.confFile, + *filerOptions.maxMB, *filerOptions.secretKey, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, *filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database, diff --git a/weed/filer/cassandra_store/cassandra_store.go b/weed/filer/cassandra_store/cassandra_store.go index cdb9d3e3c..50a792a65 100644 --- a/weed/filer/cassandra_store/cassandra_store.go +++ b/weed/filer/cassandra_store/cassandra_store.go @@ -3,6 +3,7 @@ package cassandra_store import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/gocql/gocql" @@ -59,6 +60,7 @@ func (c *CassandraStore) Get(fullFileName string) (fid string, err error) { fullFileName).Consistency(gocql.One).Scan(&output); err != nil { if err != gocql.ErrNotFound { glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) + return "", filer.ErrNotFound } } if len(output) == 0 { diff --git a/weed/filer/embedded_filer/files_in_leveldb.go b/weed/filer/embedded_filer/files_in_leveldb.go index 19f6dd7e8..c40d7adaf 100644 --- a/weed/filer/embedded_filer/files_in_leveldb.go +++ b/weed/filer/embedded_filer/files_in_leveldb.go @@ -53,7 +53,9 @@ func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string } func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { data, e := fl.db.Get(genKey(dirId, fileName), nil) - if e != nil { + if e == leveldb.ErrNotFound { + return "", filer.ErrNotFound + } else if e != nil { return "", e } return string(data), nil diff --git a/weed/filer/filer.go b/weed/filer/filer.go index fd23e119c..5d5acb68d 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -1,5 +1,9 @@ package filer +import ( + "errors" +) + type FileId string //file id in SeaweedFS type FileEntry struct { @@ -26,3 +30,5 @@ type Filer interface { DeleteDirectory(dirPath string, recursive bool) (err error) Move(fromPath string, toPath string) (err error) } + +var ErrNotFound = errors.New("filer: no entry is found in filer store") diff --git a/weed/filer/mysql_store/README.md b/weed/filer/mysql_store/README.md new file mode 100644 index 000000000..6efeb1c54 --- /dev/null +++ b/weed/filer/mysql_store/README.md @@ -0,0 +1,67 @@ +#MySQL filer mapping store + +## Schema format + + +Basically, uriPath and fid are the key elements stored in MySQL. In view of the optimization and user's usage, +adding primary key with integer type and involving createTime, updateTime, status fields should be somewhat meaningful. +Of course, you could customize the schema per your concretely circumstance freely. + +

+CREATE TABLE IF NOT EXISTS `filer_mapping` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
+  `fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
+  `createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
+  `updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
+  `remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
+  `status` tinyint(2) DEFAULT '1' COMMENT 'resource status',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `index_uriPath` (`uriPath`)
+) DEFAULT CHARSET=utf8;
+
+ + +The MySQL 's config params is not added into the weed command option as other stores(redis,cassandra). Instead, +We created a config file(json format) for them. TOML,YAML or XML also should be OK. But TOML and YAML need import thirdparty package +while XML is a little bit complex. + +The sample config file's content is below: + +

+{
+    "mysql": [
+        {
+            "User": "root",
+            "Password": "root",
+            "HostName": "127.0.0.1",
+            "Port": 3306,
+            "DataBase": "seaweedfs"
+        },
+        {
+            "User": "root",
+            "Password": "root",
+            "HostName": "127.0.0.2",
+            "Port": 3306,
+            "DataBase": "seaweedfs"
+        }
+    ],
+    "IsSharding":true,
+    "ShardCount":1024
+}
+
+ + +The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data. + +1. If one mysql instance is enough, just keep one instance in "mysql" field. + +2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table sharding numbers using "ShardCount" field. + +3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),and mark "IsSharding" with false value + +4. If you prepare more than one mysql instance and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding will still be done implicitly + + + + diff --git a/weed/filer/mysql_store/mysql_store.go b/weed/filer/mysql_store/mysql_store.go new file mode 100644 index 000000000..6910206ce --- /dev/null +++ b/weed/filer/mysql_store/mysql_store.go @@ -0,0 +1,270 @@ +package mysql_store + +import ( + "database/sql" + "fmt" + "hash/crc32" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + + _ "github.com/go-sql-driver/mysql" +) + +const ( + sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8" + default_maxIdleConnections = 100 + default_maxOpenConnections = 50 + default_maxTableNums = 1024 + tableName = "filer_mapping" +) + +var ( + _init_db sync.Once + _db_connections []*sql.DB +) + +type MySqlConf struct { + User string + Password string + HostName string + Port int + DataBase string + MaxIdleConnections int + MaxOpenConnections int +} + +type ShardingConf struct { + IsSharding bool `json:"isSharding"` + ShardCount int `json:"shardCount"` +} + +type MySqlStore struct { + dbs []*sql.DB + isSharding bool + shardCount int +} + +func getDbConnection(confs []MySqlConf) []*sql.DB { + _init_db.Do(func() { + for _, conf := range confs { + + sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase) + var dbErr error + _db_connection, dbErr := sql.Open("mysql", sqlUrl) + if dbErr != nil { + _db_connection.Close() + _db_connection = nil + panic(dbErr) + } + var maxIdleConnections, maxOpenConnections int + + if conf.MaxIdleConnections != 0 { + maxIdleConnections = conf.MaxIdleConnections + } else { + maxIdleConnections = default_maxIdleConnections + } + if conf.MaxOpenConnections != 0 { + maxOpenConnections = conf.MaxOpenConnections + } else { + maxOpenConnections = default_maxOpenConnections + } + + _db_connection.SetMaxIdleConns(maxIdleConnections) + _db_connection.SetMaxOpenConns(maxOpenConnections) + _db_connections = append(_db_connections, _db_connection) + } + }) + return _db_connections +} + +func NewMysqlStore(confs []MySqlConf, isSharding bool, shardCount int) *MySqlStore { + ms := &MySqlStore{ + dbs: getDbConnection(confs), + isSharding: isSharding, + shardCount: shardCount, + } + + for _, db := range ms.dbs { + if !isSharding { + ms.shardCount = 1 + } else { + if ms.shardCount == 0 { + ms.shardCount = default_maxTableNums + } + } + for i := 0; i < ms.shardCount; i++ { + if err := ms.createTables(db, tableName, i); err != nil { + fmt.Printf("create table failed %v", err) + } + } + } + + return ms +} + +func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) { + hash_value := crc32.ChecksumIEEE([]byte(fullFileName)) + instance_offset = int(hash_value) % len(s.dbs) + table_postfix = int(hash_value) % s.shardCount + return +} + +func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) { + instance_offset, table_postfix := s.hash(path) + instanceId = instance_offset + if s.isSharding { + tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix) + } else { + tableFullName = tableName + } + return +} + +func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) { + instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) + if err != nil { + return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err) + } + fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName) + if err == sql.ErrNoRows { + //Could not found + err = filer.ErrNotFound + } + return fid, err +} + +func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) { + var tableFullName string + + instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) + if err != nil { + return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err) + } + var old_fid string + if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows { + return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err) + } else { + if len(old_fid) == 0 { + err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName) + err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err) + } else { + err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName) + err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err) + } + } + return +} + +func (s *MySqlStore) Delete(fullFilePath string) (err error) { + var fid string + instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) + if err != nil { + return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err) + } + if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { + return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err) + } else if fid == "" { + return nil + } + if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { + return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err) + } else { + return nil + } +} + +func (s *MySqlStore) Close() { + for _, db := range s.dbs { + db.Close() + } +} + +var createTable = ` +CREATE TABLE IF NOT EXISTS %s ( + id bigint(20) NOT NULL AUTO_INCREMENT, + uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath', + fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid', + createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp', + updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp', + remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field', + status tinyint(2) DEFAULT '1' COMMENT 'resource status', + PRIMARY KEY (id), + UNIQUE KEY index_uriPath (uriPath) +) DEFAULT CHARSET=utf8; +` + +func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error { + var realTableName string + if s.isSharding { + realTableName = fmt.Sprintf("%s_%4d", tableName, postfix) + } else { + realTableName = tableName + } + + stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName)) + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec() + if err != nil { + return err + } + return nil +} + +func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) { + sqlStatement := "SELECT fid FROM %s WHERE uriPath=?" + row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath) + var fid string + err := row.Scan(&fid) + if err != nil { + return "", err + } + return fid, nil +} + +func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error { + sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?" + res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath) + if err != nil { + return err + } + + _, err = res.RowsAffected() + if err != nil { + return err + } + return nil +} + +func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error { + sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)" + res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix()) + if err != nil { + return err + } + + _, err = res.RowsAffected() + if err != nil { + return err + } + return nil +} + +func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error { + sqlStatement := "DELETE FROM %s WHERE uriPath=?" + res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath) + if err != nil { + return err + } + + _, err = res.RowsAffected() + if err != nil { + return err + } + return nil +} diff --git a/weed/filer/mysql_store/mysql_store_test.go b/weed/filer/mysql_store/mysql_store_test.go new file mode 100644 index 000000000..1c9765c59 --- /dev/null +++ b/weed/filer/mysql_store/mysql_store_test.go @@ -0,0 +1,30 @@ +package mysql_store + +import ( + "encoding/json" + "hash/crc32" + "testing" +) + +func TestGenerateMysqlConf(t *testing.T) { + var conf []MySqlConf + conf = append(conf, MySqlConf{ + User: "root", + Password: "root", + HostName: "localhost", + Port: 3306, + DataBase: "seaweedfs", + }) + body, err := json.Marshal(conf) + if err != nil { + t.Errorf("json encoding err %s", err.Error()) + } + t.Logf("json output is %s", string(body)) +} + +func TestCRC32FullPathName(t *testing.T) { + fullPathName := "/prod-bucket/law632191483895612493300-signed.pdf" + hash_value := crc32.ChecksumIEEE([]byte(fullPathName)) + table_postfix := int(hash_value) % 1024 + t.Logf("table postfix %d", table_postfix) +} diff --git a/weed/filer/redis_store/redis_store.go b/weed/filer/redis_store/redis_store.go index 5e51b5455..2ad49a805 100644 --- a/weed/filer/redis_store/redis_store.go +++ b/weed/filer/redis_store/redis_store.go @@ -1,6 +1,8 @@ package redis_store import ( + "github.com/chrislusf/seaweedfs/weed/filer" + redis "gopkg.in/redis.v2" ) @@ -20,7 +22,7 @@ func NewRedisStore(hostPort string, password string, database int) *RedisStore { func (s *RedisStore) Get(fullFileName string) (fid string, err error) { fid, err = s.Client.Get(fullFileName).Result() if err == redis.Nil { - err = nil + err = filer.ErrNotFound } return fid, err } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index b99bbd7c9..959bb92cb 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,8 +1,10 @@ package weed_server import ( + "encoding/json" "math/rand" "net/http" + "os" "strconv" "sync" "time" @@ -11,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store" "github.com/chrislusf/seaweedfs/weed/filer/embedded_filer" "github.com/chrislusf/seaweedfs/weed/filer/flat_namespace" + "github.com/chrislusf/seaweedfs/weed/filer/mysql_store" "github.com/chrislusf/seaweedfs/weed/filer/redis_store" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" @@ -18,6 +21,26 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +type filerConf struct { + MysqlConf []mysql_store.MySqlConf `json:"mysql"` + mysql_store.ShardingConf +} + +func parseConfFile(confPath string) (*filerConf, error) { + var setting filerConf + configFile, err := os.Open(confPath) + defer configFile.Close() + if err != nil { + return nil, err + } + + jsonParser := json.NewDecoder(configFile) + if err = jsonParser.Decode(&setting); err != nil { + return nil, err + } + return &setting, nil +} + type FilerServer struct { port string master string @@ -28,11 +51,14 @@ type FilerServer struct { disableDirListing bool secret security.Secret filer filer.Filer + maxMB int masterNodes *storage.MasterNodes } func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, replication string, redirectOnRead bool, disableDirListing bool, + confFile string, + maxMB int, secret string, cassandra_server string, cassandra_keyspace string, redis_server string, redis_password string, redis_database int, @@ -43,10 +69,24 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st defaultReplication: replication, redirectOnRead: redirectOnRead, disableDirListing: disableDirListing, + maxMB: maxMB, port: ip + ":" + strconv.Itoa(port), } - if cassandra_server != "" { + var setting *filerConf + if confFile != "" { + setting, err = parseConfFile(confFile) + if err != nil { + return nil, err + } + } else { + setting = new(filerConf) + } + + if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 { + mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount) + fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store) + } else if cassandra_server != "" { cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) if err != nil { glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 6b9505377..bf95e37b9 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui" @@ -87,7 +88,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } fileId, err := fs.filer.FindFile(r.URL.Path) - if err == leveldb.ErrNotFound { + if err == filer.ErrNotFound { glog.V(3).Infoln("Not found in db", r.URL.Path) w.WriteHeader(http.StatusNotFound) return diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index e2d40f532..464cb81ef 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -15,11 +15,13 @@ import ( "net/url" "strings" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/syndtr/goleveldb/leveldb" + "path" + "strconv" ) type FilerPostResult struct { @@ -71,17 +73,17 @@ func makeFormData(filename, mimeType string, content io.Reader) (formData io.Rea } func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) { - if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound { + if fileId, err = fs.filer.FindFile(path); err != nil && err != filer.ErrNotFound { glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) writeJsonError(w, r, http.StatusInternalServerError, err) - return } else if fileId != "" && err == nil { urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId) if err != nil { glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) w.WriteHeader(http.StatusNotFound) - return } + } else if fileId == "" && err == filer.ErrNotFound { + w.WriteHeader(http.StatusNotFound) } return } @@ -217,6 +219,7 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -227,6 +230,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { collection = fs.collection } + if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked { + return + } + var fileId, urlLocation string var err error @@ -243,7 +250,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } u, _ := url.Parse(urlLocation) + + // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off + // because they need to provide FIDs instead of file paths... + cm, _ := strconv.ParseBool(query.Get("cm")) + if cm { + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + } glog.V(4).Infoln("post to", u) + request := &http.Request{ Method: r.Method, URL: u, @@ -298,6 +315,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "PUT" { if oldFid, err := fs.filer.FindFile(path); err == nil { operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + } else if err != nil && err != filer.ErrNotFound { + glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) } } @@ -319,6 +338,199 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, http.StatusCreated, reply) } +func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool { + if r.Method != "POST" { + glog.V(4).Infoln("AutoChunking not supported for method", r.Method) + return false + } + + // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line + query := r.URL.Query() + + parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32) + maxMB := int32(parsedMaxMB) + if maxMB <= 0 && fs.maxMB > 0 { + maxMB = int32(fs.maxMB) + } + if maxMB <= 0 { + glog.V(4).Infoln("AutoChunking not enabled") + return false + } + glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") + + chunkSize := 1024 * 1024 * maxMB + + contentLength := int64(0) + if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { + contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) + if contentLength <= int64(chunkSize) { + glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") + return false + } + } + + if contentLength <= 0 { + glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") + return false + } + + reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else if reply != nil { + writeJsonQuiet(w, r, http.StatusCreated, reply) + } + return true +} + +func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) { + + multipartReader, multipartReaderErr := r.MultipartReader() + if multipartReaderErr != nil { + return nil, multipartReaderErr + } + + part1, part1Err := multipartReader.NextPart() + if part1Err != nil { + return nil, part1Err + } + + fileName := part1.FileName() + if fileName != "" { + fileName = path.Base(fileName) + } + + chunks := (int64(contentLength) / int64(chunkSize)) + 1 + cm := operation.ChunkManifest{ + Name: fileName, + Size: 0, // don't know yet + Mime: "application/octet-stream", + Chunks: make([]*operation.ChunkInfo, 0, chunks), + } + + totalBytesRead := int64(0) + tmpBufferSize := int32(1024 * 1024) + tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize)) + chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow + chunkBufOffset := int32(0) + chunkOffset := int64(0) + writtenChunks := 0 + + filerResult = &FilerPostResult{ + Name: fileName, + } + + for totalBytesRead < contentLength { + tmpBuffer.Reset() + bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize)) + readFully := readErr != nil && readErr == io.EOF + tmpBuf := tmpBuffer.Bytes() + bytesToCopy := tmpBuf[0:int(bytesRead)] + + copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy) + chunkBufOffset = chunkBufOffset + int32(bytesRead) + + if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { + writtenChunks = writtenChunks + 1 + fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection) + if assignErr != nil { + return nil, assignErr + } + + // upload the chunk to the volume server + chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10) + uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) + if uploadErr != nil { + return nil, uploadErr + } + + // Save to chunk manifest structure + cm.Chunks = append(cm.Chunks, + &operation.ChunkInfo{ + Offset: chunkOffset, + Size: int64(chunkBufOffset), + Fid: fileId, + }, + ) + + // reset variables for the next chunk + chunkBufOffset = 0 + chunkOffset = totalBytesRead + int64(bytesRead) + } + + totalBytesRead = totalBytesRead + int64(bytesRead) + + if bytesRead == 0 || readFully { + break + } + + if readErr != nil { + return nil, readErr + } + } + + cm.Size = totalBytesRead + manifestBuf, marshalErr := cm.Marshal() + if marshalErr != nil { + return nil, marshalErr + } + + manifestStr := string(manifestBuf) + glog.V(4).Infoln("Generated chunk manifest: ", manifestStr) + + manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection) + if manifestAssignmentErr != nil { + return nil, manifestAssignmentErr + } + glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId) + filerResult.Fid = manifestFileId + + u, _ := url.Parse(manifestUrlLocation) + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + + manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId) + if manifestUploadErr != nil { + return nil, manifestUploadErr + } + + path := r.URL.Path + // also delete the old fid unless PUT operation + if r.Method != "PUT" { + if oldFid, err := fs.filer.FindFile(path); err == nil { + operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + } else if err != nil && err != filer.ErrNotFound { + glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) + } + } + + glog.V(4).Infoln("saving", path, "=>", manifestFileId) + if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil { + replyerr = db_err + filerResult.Error = db_err.Error() + operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up + glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + return + } + + return +} + +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { + err = nil + + ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) + uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId)) + if uploadResult != nil { + glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) + } + if uploadError != nil { + err = uploadError + } + return +} + // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/storage/compact_map.go b/weed/storage/compact_map.go index 6afaf7df8..721be2ec7 100644 --- a/weed/storage/compact_map.go +++ b/weed/storage/compact_map.go @@ -41,10 +41,10 @@ func NewCompactSection(start Key) *CompactSection { //return old entry size func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 { ret := uint32(0) + cs.Lock() if key > cs.end { cs.end = key } - cs.Lock() if i := cs.binarySearchValues(key); i >= 0 { ret = cs.values[i].Size //println("key", key, "old size", ret) diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 801dfe267..c1d531376 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -23,6 +23,9 @@ type Volume struct { dataFileAccessLock sync.Mutex lastModifiedTime uint64 //unix time in seconds + + lastCompactIndexOffset uint64 + lastCompactRevision uint16 } func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index d424010f1..48f707594 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -21,7 +21,6 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) } key, offset, size := idxFileEntry(lastIdxEntry) - //deleted index entry could not point to deleted needle if offset == 0 { return nil } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 9b9a27816..723300557 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -6,6 +6,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" ) func (v *Volume) garbageLevel() float64 { @@ -20,9 +21,19 @@ func (v *Volume) Compact() error { //glog.V(3).Infof("Got Compaction lock...") filePath := v.FileName() - glog.V(3).Infof("creating copies for volume %d ...", v.Id) + v.lastCompactIndexOffset = v.nm.IndexFileSize() + v.lastCompactRevision = v.SuperBlock.CompactRevision + glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") } + +func (v *Volume) Compact2() error { + glog.V(3).Infof("Compact2 ...") + filePath := v.FileName() + glog.V(3).Infof("creating copies for volume %d ...", v.Id) + return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx") +} + func (v *Volume) commitCompact() error { glog.V(3).Infof("Committing vacuuming...") v.dataFileAccessLock.Lock() @@ -30,13 +41,28 @@ func (v *Volume) commitCompact() error { glog.V(3).Infof("Got Committing lock...") v.nm.Close() _ = v.dataFile.Close() + var e error - if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { - return e - } - if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { - return e + if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { + glog.V(0).Infof("makeupDiff in commitCompact failed %v", e) + e = os.Remove(v.FileName() + ".cpd") + if e != nil { + return e + } + e = os.Remove(v.FileName() + ".cpx") + if e != nil { + return e + } + } else { + var e error + if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { + return e + } + if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { + return e + } } + //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) glog.V(3).Infof("Loading Commit file...") @@ -46,6 +72,141 @@ func (v *Volume) commitCompact() error { return nil } +func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) { + if _, err = file.Seek(0, 0); err != nil { + return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err) + } + header := make([]byte, SuperBlockSize) + if _, e := file.Read(header); e != nil { + return 0, fmt.Errorf("cannot read file %s 's super block: %v", file.Name(), e) + } + superBlock, err := ParseSuperBlock(header) + if err != nil { + return 0, err + } + return superBlock.CompactRevision, nil +} + +func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) { + var indexSize int64 + + oldIdxFile, err := os.Open(oldIdxFileName) + defer oldIdxFile.Close() + + oldDatFile, err := os.Open(oldDatFileName) + defer oldDatFile.Close() + + if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { + return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) + } + if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset { + return nil + } + + oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile) + if err != nil { + return + } + if oldDatCompactRevision != v.lastCompactRevision { + return fmt.Errorf("current old dat file's compact revision %d is not the expected one %d", oldDatCompactRevision, v.lastCompactRevision) + } + + type keyField struct { + offset uint32 + size uint32 + } + incrementedHasUpdatedIndexEntry := make(map[uint64]keyField) + + for idx_offset := indexSize - NeedleIndexSize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleIndexSize { + var IdxEntry []byte + if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil { + return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err) + } + key, offset, size := idxFileEntry(IdxEntry) + if _, found := incrementedHasUpdatedIndexEntry[key]; !found { + incrementedHasUpdatedIndexEntry[key] = keyField{ + offset: offset, + size: size, + } + } + } + + if len(incrementedHasUpdatedIndexEntry) > 0 { + var ( + dst, idx *os.File + ) + if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil { + return + } + defer dst.Close() + + if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil { + return + } + defer idx.Close() + + var newDatCompactRevision uint16 + newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst) + if err != nil { + return + } + if oldDatCompactRevision+1 != newDatCompactRevision { + return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision) + } + + idx_entry_bytes := make([]byte, 16) + for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry { + util.Uint64toBytes(idx_entry_bytes[0:8], key) + util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset) + util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size) + + var offset int64 + if offset, err = dst.Seek(0, 2); err != nil { + glog.V(0).Infof("failed to seek the end of file: %v", err) + return + } + //ensure file writing starting from aligned positions + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + if offset, err = v.dataFile.Seek(offset, 0); err != nil { + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) + return + } + } + + //updated needle + if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 { + //even the needle cache in memory is hit, the need_bytes is correct + var needle_bytes []byte + needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) + if err != nil { + return + } + dst.Write(needle_bytes) + util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize)) + } else { //deleted needle + //fakeDelNeedle 's default Data field is nil + fakeDelNeedle := new(Needle) + fakeDelNeedle.Id = key + fakeDelNeedle.Cookie = 0x12345678 + _, err = fakeDelNeedle.Append(dst, v.Version()) + if err != nil { + return + } + util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0)) + } + + if _, err := idx.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + newIdxFileName, err) + } + _, err = idx.Write(idx_entry_bytes) + } + } + + return nil +} + func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { var ( dst, idx *os.File @@ -91,3 +252,64 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro return } + +func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { + var ( + dst, idx, oldIndexFile *os.File + ) + if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + return + } + defer dst.Close() + + if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + return + } + defer idx.Close() + + if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil { + return + } + defer oldIndexFile.Close() + + nm := NewNeedleMap(idx) + now := uint64(time.Now().Unix()) + + v.SuperBlock.CompactRevision++ + dst.Write(v.SuperBlock.Bytes()) + new_offset := int64(SuperBlockSize) + + WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error { + if size <= 0 { + return nil + } + + nv, ok := v.nm.Get(key) + if !ok { + return nil + } + + n := new(Needle) + n.ReadData(v.dataFile, int64(offset)*NeedlePaddingSize, size, v.Version()) + defer n.ReleaseMemory() + + if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + return nil + } + + glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) + if nv.Offset == offset && nv.Size > 0 { + if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + return fmt.Errorf("cannot put needle: %s", err) + } + if _, err = n.Append(dst, v.Version()); err != nil { + return fmt.Errorf("cannot append needle: %s", err) + } + new_offset += n.DiskSize() + glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size) + } + return nil + }) + + return +} diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go new file mode 100644 index 000000000..c2fac6ce8 --- /dev/null +++ b/weed/storage/volume_vacuum_test.go @@ -0,0 +1,55 @@ +package storage + +import ( + "testing" +) + +/* +makediff test steps +1. launch weed server at your local/dev environment, (option +"garbageThreshold" for master and option "max" for volume should be set with specific value which would let +preparing test prerequisite easier ) + a) ./weed master -garbageThreshold=0.99 -mdir=./m + b) ./weed volume -dir=./data -max=1 -mserver=localhost:9333 -port=8080 +2. upload 4 different files, you could call dir/assign to get 4 different fids + a) upload file A with fid a + b) upload file B with fid b + c) upload file C with fid c + d) upload file D with fid d +3. update file A and C + a) modify file A and upload file A with fid a + b) modify file C and upload file C with fid c + c) record the current 1.idx's file size(lastCompactIndexOffset value) +4. Compacting the data file + a) run curl http://localhost:8080/admin/vacuum/compact?volumeId=1 + b) verify the 1.cpd and 1.cpx is created under volume directory +5. update file B and delete file D + a) modify file B and upload file B with fid b + d) delete file B with fid b +6. Now you could run the following UT case, the case should be run successfully +7. Compact commit manually + a) mv 1.cpd 1.dat + b) mv 1.cpx 1.idx +8. Restart Volume Server +9. Now you should get updated file A,B,C +*/ + +func TestMakeDiff(t *testing.T) { + + v := new(Volume) + //lastCompactIndexOffset value is the index file size before step 4 + v.lastCompactIndexOffset = 96 + v.SuperBlock.version = 0x2 + /* + err := v.makeupDiff( + "/yourpath/1.cpd", + "/yourpath/1.cpx", + "/yourpath/1.dat", + "/yourpath/1.idx") + if err != nil { + t.Errorf("makeupDiff err is %v", err) + } else { + t.Log("makeupDiff Succeeded") + } + */ +} diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 1404d4aa8..b7f039559 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -53,7 +53,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } - dn.RLock() + dn.Lock() for vid, v := range dn.volumes { if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) @@ -62,8 +62,8 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo dn.UpAdjustVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1) } - } //TODO: adjust max volume id, if need to reclaim volume ids - dn.RUnlock() + } + dn.Unlock() for _, v := range actualVolumes { dn.AddOrUpdateVolume(v) } @@ -79,6 +79,17 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { return ret } +func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) { + dn.RLock() + defer dn.RUnlock() + v_info, ok := dn.volumes[id] + if ok { + return v_info, nil + } else { + return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") + } +} + func (dn *DataNode) GetDataCenter() *DataCenter { return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 066f5f69a..af8503b29 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -45,6 +45,19 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } vl.vid2location[v.Id].Set(dn) glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) + for _, dn := range vl.vid2location[v.Id].list { + if v_info, err := dn.GetVolumesById(v.Id); err == nil { + if v_info.ReadOnly { + glog.V(3).Infof("vid %d removed from writable", v.Id) + vl.removeFromWritable(v.Id) + return + } + } else { + glog.V(3).Infof("vid %d removed from writable", v.Id) + vl.removeFromWritable(v.Id) + return + } + } if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { if _, ok := vl.oversizedVolumes[v.Id]; !ok { vl.addToWritable(v.Id)