Browse Source

refactoring for later security changes

pull/45/head
Chris Lu 10 years ago
parent
commit
a3e4145e8a
  1. 146
      go/security/guard.go
  2. 4
      go/weed/master.go
  3. 8
      go/weed/server.go
  4. 4
      go/weed/volume.go
  5. 20
      go/weed/weed_server/common.go
  6. 32
      go/weed/weed_server/master_server.go
  7. 31
      go/weed/weed_server/volume_server.go
  8. 6
      go/weed/weed_server/volume_server_handlers.go
  9. 36
      note/security.txt

146
go/security/guard.go

@ -0,0 +1,146 @@
package security
import (
"errors"
"fmt"
"net"
"net/http"
"strings"
"time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/dgrijalva/jwt-go"
)
var (
ErrUnauthorized = errors.New("unauthorized token")
)
/*
Guard is to ensure data access security.
There are 2 ways to check access:
1. white list. It's checking request ip address.
2. JSON Web Token(JWT) generated from secretKey.
The jwt can come from:
1. url parameter jwt=...
2. request header "Authorization"
3. cookie with the name "jwt"
The white list is checked first because it is easy.
Then the JWT is checked.
The Guard will also check these claims if provided:
1. "exp" Expiration Time
2. "nbf" Not Before
Generating JWT:
1. use HS256 to sign
2. optionally set "exp", "nbf" fields, in Unix time,
the number of seconds elapsed since January 1, 1970 UTC.
Referenced:
https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go
*/
type Guard struct {
whiteList []string
secretKey string
isActive bool
}
func NewGuard(whiteList []string, secretKey string) *Guard {
g := &Guard{whiteList: whiteList, secretKey: secretKey}
g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0
return g
}
func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
if !g.isActive {
//if no security needed, just skip all checkings
return f
}
return func(w http.ResponseWriter, r *http.Request) {
if err := g.doCheck(w, r); err != nil {
w.WriteHeader(http.StatusUnauthorized)
return
}
f(w, r)
}
}
func (g *Guard) NewToken() (tokenString string, err error) {
m := make(map[string]interface{})
m["exp"] = time.Now().Unix() + 10
return g.Encode(m)
}
func (g *Guard) Encode(claims map[string]interface{}) (tokenString string, err error) {
if !g.isActive {
return "", nil
}
t := jwt.New(jwt.GetSigningMethod("HS256"))
t.Claims = claims
return t.SignedString(g.secretKey)
}
func (g *Guard) Decode(tokenString string) (token *jwt.Token, err error) {
return jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
return g.secretKey, nil
})
}
func (g *Guard) doCheck(w http.ResponseWriter, r *http.Request) error {
if len(g.whiteList) != 0 {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err == nil {
for _, ip := range g.whiteList {
if ip == host {
return nil
}
}
}
}
if len(g.secretKey) != 0 {
// Get token from query params
tokenStr := r.URL.Query().Get("jwt")
// Get token from authorization header
if tokenStr == "" {
bearer := r.Header.Get("Authorization")
if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
tokenStr = bearer[7:]
}
}
// Get token from cookie
if tokenStr == "" {
cookie, err := r.Cookie("jwt")
if err == nil {
tokenStr = cookie.Value
}
}
if tokenStr == "" {
return ErrUnauthorized
}
// Verify the token
token, err := g.Decode(tokenStr)
if err != nil {
glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err)
return ErrUnauthorized
}
if !token.Valid {
glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr)
return ErrUnauthorized
}
}
glog.V(1).Infof("No permission from %s", r.RemoteAddr)
return fmt.Errorf("No write permisson from %s", r.RemoteAddr)
}

4
go/weed/master.go

@ -42,6 +42,7 @@ var (
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
masterSecureKey = cmdMaster.Flag.String("secure.key", "", "secret key to check permission")
masterWhiteList []string masterWhiteList []string
) )
@ -60,7 +61,8 @@ func runMaster(cmd *Command, args []string) bool {
r := mux.NewRouter() r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *mport, *metaFolder, ms := weed_server.NewMasterServer(r, *mport, *metaFolder,
*volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList,
*volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold,
masterWhiteList, *masterSecureKey,
) )
listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport) listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport)

8
go/weed/server.go

@ -56,6 +56,7 @@ var (
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list") serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list")
serverSecureKey = cmdServer.Flag.String("secure.key", "", "secret key to ensure authenticated access")
serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
@ -185,7 +186,8 @@ func runServer(cmd *Command, args []string) bool {
go func() { go func() {
r := mux.NewRouter() r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder,
*masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, serverWhiteList,
*masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
serverWhiteList, *serverSecureKey,
) )
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort)) glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort))
@ -217,8 +219,8 @@ func runServer(cmd *Command, args []string) bool {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
r := http.NewServeMux() r := http.NewServeMux()
volumeServer := weed_server.NewVolumeServer(r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts, volumeServer := weed_server.NewVolumeServer(r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList,
*volumeFixJpgOrientation,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation,
) )
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort)) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))

4
go/weed/volume.go

@ -27,6 +27,7 @@ var cmdVolume = &Command{
var ( var (
vport = cmdVolume.Flag.Int("port", 8080, "http listen port") vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
volumeSecurePort = cmdVolume.Flag.Int("port.secure", 8443, "https listen port, active when SSL certs are specified. Not ready yet.")
volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
ip = cmdVolume.Flag.String("ip", "", "ip or server name") ip = cmdVolume.Flag.String("ip", "", "ip or server name")
@ -82,7 +83,8 @@ func runVolume(cmd *Command, args []string) bool {
r := http.NewServeMux() r := http.NewServeMux()
volumeServer := weed_server.NewVolumeServer(r, *ip, *vport, *publicIp, folders, maxCounts, volumeServer := weed_server.NewVolumeServer(r, *ip, *vport, *publicIp, folders, maxCounts,
*masterNode, *vpulse, *dataCenter, *rack, volumeWhiteList,
*masterNode, *vpulse, *dataCenter, *rack,
volumeWhiteList,
*fixJpgOrientation, *fixJpgOrientation,
) )

20
go/weed/weed_server/common.go

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"net/http" "net/http"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -72,25 +71,6 @@ func debug(params ...interface{}) {
glog.V(4).Infoln(params) glog.V(4).Infoln(params)
} }
func secure(whiteList []string, f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if len(whiteList) == 0 {
f(w, r)
return
}
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err == nil {
for _, ip := range whiteList {
if ip == host {
f(w, r)
return
}
}
}
writeJsonQuiet(w, r, map[string]interface{}{"error": "No write permisson from " + host})
}
}
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
m := make(map[string]interface{}) m := make(map[string]interface{})
if r.Method != "POST" { if r.Method != "POST" {

32
go/weed/weed_server/master_server.go

@ -7,6 +7,7 @@ import (
"sync" "sync"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/sequence" "github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/topology" "github.com/chrislusf/weed-fs/go/topology"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
@ -21,7 +22,6 @@ type MasterServer struct {
pulseSeconds int pulseSeconds int
defaultReplicaPlacement string defaultReplicaPlacement string
garbageThreshold string garbageThreshold string
whiteList []string
Topo *topology.Topology Topo *topology.Topology
vg *topology.VolumeGrowth vg *topology.VolumeGrowth
@ -37,6 +37,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
defaultReplicaPlacement string, defaultReplicaPlacement string,
garbageThreshold string, garbageThreshold string,
whiteList []string, whiteList []string,
secureKey string,
) *MasterServer { ) *MasterServer {
ms := &MasterServer{ ms := &MasterServer{
port: port, port: port,
@ -44,7 +45,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
pulseSeconds: pulseSeconds, pulseSeconds: pulseSeconds,
defaultReplicaPlacement: defaultReplicaPlacement, defaultReplicaPlacement: defaultReplicaPlacement,
garbageThreshold: garbageThreshold, garbageThreshold: garbageThreshold,
whiteList: whiteList,
} }
ms.bounedLeaderChan = make(chan int, 16) ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer() seq := sequence.NewMemorySequencer()
@ -56,20 +56,22 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
ms.vg = topology.NewDefaultVolumeGrowth() ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
r.HandleFunc("/dir/assign", ms.proxyToLeader(secure(ms.whiteList, ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.dirLookupHandler)))
r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(secure(ms.whiteList, ms.collectionDeleteHandler)))
r.HandleFunc("/vol/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.volumeLookupHandler)))
r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler)))
r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler))
r.HandleFunc("/delete", secure(ms.whiteList, ms.deleteFromMasterServerHandler))
guard := security.NewGuard(whiteList, secureKey)
r.HandleFunc("/dir/assign", ms.proxyToLeader(guard.Secure(ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.proxyToLeader(guard.Secure(ms.dirLookupHandler)))
r.HandleFunc("/dir/join", ms.proxyToLeader(guard.Secure(ms.dirJoinHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(guard.Secure(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(guard.Secure(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/lookup", ms.proxyToLeader(guard.Secure(ms.volumeLookupHandler)))
r.HandleFunc("/vol/grow", ms.proxyToLeader(guard.Secure(ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(guard.Secure(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(guard.Secure(ms.volumeVacuumHandler)))
r.HandleFunc("/submit", guard.Secure(ms.submitFromMasterServerHandler))
r.HandleFunc("/delete", guard.Secure(ms.deleteFromMasterServerHandler))
r.HandleFunc("/{fileId}", ms.redirectHandler) r.HandleFunc("/{fileId}", ms.redirectHandler)
r.HandleFunc("/stats/counter", secure(ms.whiteList, statsCounterHandler))
r.HandleFunc("/stats/memory", secure(ms.whiteList, statsMemoryHandler))
r.HandleFunc("/stats/counter", guard.Secure(statsCounterHandler))
r.HandleFunc("/stats/memory", guard.Secure(statsMemoryHandler))
ms.Topo.StartRefreshWritableVolumes(garbageThreshold) ms.Topo.StartRefreshWritableVolumes(garbageThreshold)

31
go/weed/weed_server/volume_server.go

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
) )
@ -15,8 +16,8 @@ type VolumeServer struct {
pulseSeconds int pulseSeconds int
dataCenter string dataCenter string
rack string rack string
whiteList []string
store *storage.Store store *storage.Store
guard *security.Guard
FixJpgOrientation bool FixJpgOrientation bool
} }
@ -24,29 +25,31 @@ type VolumeServer struct {
func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int, func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int,
masterNode string, pulseSeconds int, masterNode string, pulseSeconds int,
dataCenter string, rack string, dataCenter string, rack string,
whiteList []string, fixJpgOrientation bool) *VolumeServer {
whiteList []string,
fixJpgOrientation bool) *VolumeServer {
publicUrl := publicIp + ":" + strconv.Itoa(port) publicUrl := publicIp + ":" + strconv.Itoa(port)
vs := &VolumeServer{ vs := &VolumeServer{
masterNode: masterNode, masterNode: masterNode,
pulseSeconds: pulseSeconds, pulseSeconds: pulseSeconds,
dataCenter: dataCenter, dataCenter: dataCenter,
rack: rack, rack: rack,
whiteList: whiteList,
FixJpgOrientation: fixJpgOrientation, FixJpgOrientation: fixJpgOrientation,
} }
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts) vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts)
r.HandleFunc("/status", secure(vs.whiteList, vs.statusHandler))
r.HandleFunc("/admin/assign_volume", secure(vs.whiteList, vs.assignVolumeHandler))
r.HandleFunc("/admin/vacuum_volume_check", secure(vs.whiteList, vs.vacuumVolumeCheckHandler))
r.HandleFunc("/admin/vacuum_volume_compact", secure(vs.whiteList, vs.vacuumVolumeCompactHandler))
r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler))
r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler))
r.HandleFunc("/admin/delete_collection", secure(vs.whiteList, vs.deleteCollectionHandler))
r.HandleFunc("/stats/counter", secure(vs.whiteList, statsCounterHandler))
r.HandleFunc("/stats/memory", secure(vs.whiteList, statsMemoryHandler))
r.HandleFunc("/stats/disk", secure(vs.whiteList, vs.statsDiskHandler))
r.HandleFunc("/delete", secure(vs.whiteList, vs.batchDeleteHandler))
vs.guard = security.NewGuard(whiteList, "")
r.HandleFunc("/status", vs.guard.Secure(vs.statusHandler))
r.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler))
r.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler))
r.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler))
r.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler))
r.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler))
r.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler))
r.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler))
r.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler))
r.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler))
r.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler))
r.HandleFunc("/", vs.storeHandler) r.HandleFunc("/", vs.storeHandler)
go func() { go func() {

6
go/weed/weed_server/volume_server_handlers.go

@ -29,13 +29,13 @@ func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) {
vs.GetOrHeadHandler(w, r) vs.GetOrHeadHandler(w, r)
case "DELETE": case "DELETE":
stats.DeleteRequest() stats.DeleteRequest()
secure(vs.whiteList, vs.DeleteHandler)(w, r)
vs.guard.Secure(vs.DeleteHandler)(w, r)
case "PUT": case "PUT":
stats.WriteRequest() stats.WriteRequest()
secure(vs.whiteList, vs.PostHandler)(w, r)
vs.guard.Secure(vs.PostHandler)(w, r)
case "POST": case "POST":
stats.WriteRequest() stats.WriteRequest()
secure(vs.whiteList, vs.PostHandler)(w, r)
vs.guard.Secure(vs.PostHandler)(w, r)
} }
} }

36
note/security.txt

@ -0,0 +1,36 @@
Design for Seaweed-FS security
Design Objectives
Security can mean many different things. The original vision is that: if you have one machine lying around
somewhere with some disk space, it should be able to join your file system to contribute some disk space and
network bandwidth.
To achieve this purpose, the security should be able to:
1. Secure the inter-server communication. Only real cluster servers can join and communicate.
2. allow clients to securely write to volume servers
Non Objective
Multi-tenant support. Avoid filers or clients cross-updating files.
User specific access control.
Design Architect
master, and volume servers all talk securely via 2-way SSL for admin.
upon joining, master gives its secret key to volume servers.
filer or clients talk to master to get secret key, and use the key to generate JWT to write on volume server.
A side benefit:
a time limited read feature?
4. volume server needs to expose https ports
HTTP Connections
clear http
filer~>master, need to get a JWT from master
filer~>volume
2-way https
master~ssl~>volume
volume~ssl~>master
file uploading:
when volume server starts, it asks master for the secret key to decode JWT
when filer/clients wants to upload, master generate a JWT
filer~>volume(public port)
master~>volume(public port)
Loading…
Cancel
Save