You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
383 lines
11 KiB
383 lines
11 KiB
package sts
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// MemorySessionStore implements SessionStore using in-memory storage
|
|
type MemorySessionStore struct {
|
|
sessions map[string]*SessionInfo
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewMemorySessionStore creates a new memory-based session store
|
|
func NewMemorySessionStore() *MemorySessionStore {
|
|
return &MemorySessionStore{
|
|
sessions: make(map[string]*SessionInfo),
|
|
}
|
|
}
|
|
|
|
// StoreSession stores session information in memory (filerAddress ignored for memory store)
|
|
func (m *MemorySessionStore) StoreSession(ctx context.Context, filerAddress string, sessionId string, session *SessionInfo) error {
|
|
if sessionId == "" {
|
|
return fmt.Errorf(ErrSessionIDCannotBeEmpty)
|
|
}
|
|
|
|
if session == nil {
|
|
return fmt.Errorf("session cannot be nil")
|
|
}
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.sessions[sessionId] = session
|
|
return nil
|
|
}
|
|
|
|
// GetSession retrieves session information from memory (filerAddress ignored for memory store)
|
|
func (m *MemorySessionStore) GetSession(ctx context.Context, filerAddress string, sessionId string) (*SessionInfo, error) {
|
|
if sessionId == "" {
|
|
return nil, fmt.Errorf(ErrSessionIDCannotBeEmpty)
|
|
}
|
|
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
session, exists := m.sessions[sessionId]
|
|
if !exists {
|
|
return nil, fmt.Errorf("session not found")
|
|
}
|
|
|
|
// Check if session has expired
|
|
if time.Now().After(session.ExpiresAt) {
|
|
return nil, fmt.Errorf("session has expired")
|
|
}
|
|
|
|
return session, nil
|
|
}
|
|
|
|
// RevokeSession revokes a session from memory (filerAddress ignored for memory store)
|
|
func (m *MemorySessionStore) RevokeSession(ctx context.Context, filerAddress string, sessionId string) error {
|
|
if sessionId == "" {
|
|
return fmt.Errorf(ErrSessionIDCannotBeEmpty)
|
|
}
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
delete(m.sessions, sessionId)
|
|
return nil
|
|
}
|
|
|
|
// CleanupExpiredSessions removes expired sessions from memory (filerAddress ignored for memory store)
|
|
func (m *MemorySessionStore) CleanupExpiredSessions(ctx context.Context, filerAddress string) error {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
now := time.Now()
|
|
for sessionId, session := range m.sessions {
|
|
if now.After(session.ExpiresAt) {
|
|
delete(m.sessions, sessionId)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ExpireSessionForTesting manually expires a session for testing purposes (filerAddress ignored for memory store)
|
|
func (m *MemorySessionStore) ExpireSessionForTesting(ctx context.Context, filerAddress string, sessionId string) error {
|
|
if sessionId == "" {
|
|
return fmt.Errorf(ErrSessionIDCannotBeEmpty)
|
|
}
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
session, exists := m.sessions[sessionId]
|
|
if !exists {
|
|
return fmt.Errorf("session not found")
|
|
}
|
|
|
|
// Set expiration to 1 minute in the past to ensure it's expired
|
|
session.ExpiresAt = time.Now().Add(-1 * time.Minute)
|
|
m.sessions[sessionId] = session
|
|
|
|
return nil
|
|
}
|
|
|
|
// FilerSessionStore implements SessionStore using SeaweedFS filer
|
|
type FilerSessionStore struct {
|
|
grpcDialOption grpc.DialOption
|
|
basePath string
|
|
}
|
|
|
|
// NewFilerSessionStore creates a new filer-based session store
|
|
func NewFilerSessionStore(config map[string]interface{}) (*FilerSessionStore, error) {
|
|
store := &FilerSessionStore{
|
|
basePath: DefaultSessionBasePath, // Use constant default
|
|
}
|
|
|
|
// Parse configuration - only basePath and other settings, NOT filerAddress
|
|
if config != nil {
|
|
if basePath, ok := config[ConfigFieldBasePath].(string); ok && basePath != "" {
|
|
store.basePath = strings.TrimSuffix(basePath, "/")
|
|
}
|
|
}
|
|
|
|
glog.V(2).Infof("Initialized FilerSessionStore with basePath %s", store.basePath)
|
|
|
|
return store, nil
|
|
}
|
|
|
|
// StoreSession stores session information in filer
|
|
func (f *FilerSessionStore) StoreSession(ctx context.Context, filerAddress string, sessionId string, session *SessionInfo) error {
|
|
if filerAddress == "" {
|
|
return fmt.Errorf(ErrFilerAddressRequired)
|
|
}
|
|
if sessionId == "" {
|
|
return fmt.Errorf(ErrSessionIDCannotBeEmpty)
|
|
}
|
|
if session == nil {
|
|
return fmt.Errorf("session cannot be nil")
|
|
}
|
|
|
|
// Serialize session to JSON
|
|
sessionData, err := json.Marshal(session)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to serialize session: %v", err)
|
|
}
|
|
|
|
sessionPath := f.getSessionPath(sessionId)
|
|
|
|
// Store in filer
|
|
return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
|
|
request := &filer_pb.CreateEntryRequest{
|
|
Directory: f.basePath,
|
|
Entry: &filer_pb.Entry{
|
|
Name: f.getSessionFileName(sessionId),
|
|
IsDirectory: false,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
Mtime: time.Now().Unix(),
|
|
Crtime: time.Now().Unix(),
|
|
FileMode: uint32(0600), // Read/write for owner only
|
|
Uid: uint32(0),
|
|
Gid: uint32(0),
|
|
},
|
|
Content: sessionData,
|
|
},
|
|
}
|
|
|
|
glog.V(3).Infof("Storing session %s at %s", sessionId, sessionPath)
|
|
_, err := client.CreateEntry(ctx, request)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to store session %s: %v", sessionId, err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// GetSession retrieves session information from filer
|
|
func (f *FilerSessionStore) GetSession(ctx context.Context, filerAddress string, sessionId string) (*SessionInfo, error) {
|
|
if filerAddress == "" {
|
|
return nil, fmt.Errorf(ErrFilerAddressRequired)
|
|
}
|
|
if sessionId == "" {
|
|
return nil, fmt.Errorf(ErrSessionIDCannotBeEmpty)
|
|
}
|
|
|
|
var sessionData []byte
|
|
err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
|
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: f.basePath,
|
|
Name: f.getSessionFileName(sessionId),
|
|
}
|
|
|
|
glog.V(3).Infof("Looking up session %s", sessionId)
|
|
response, err := client.LookupDirectoryEntry(ctx, request)
|
|
if err != nil {
|
|
return fmt.Errorf("session not found: %v", err)
|
|
}
|
|
|
|
if response.Entry == nil {
|
|
return fmt.Errorf("session not found")
|
|
}
|
|
|
|
sessionData = response.Entry.Content
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Deserialize session from JSON
|
|
var session SessionInfo
|
|
if err := json.Unmarshal(sessionData, &session); err != nil {
|
|
return nil, fmt.Errorf("failed to deserialize session: %v", err)
|
|
}
|
|
|
|
// Check if session has expired
|
|
if time.Now().After(session.ExpiresAt) {
|
|
// Clean up expired session
|
|
_ = f.RevokeSession(ctx, filerAddress, sessionId)
|
|
return nil, fmt.Errorf("session has expired")
|
|
}
|
|
|
|
return &session, nil
|
|
}
|
|
|
|
// RevokeSession revokes a session from filer
|
|
func (f *FilerSessionStore) RevokeSession(ctx context.Context, filerAddress string, sessionId string) error {
|
|
if filerAddress == "" {
|
|
return fmt.Errorf(ErrFilerAddressRequired)
|
|
}
|
|
if sessionId == "" {
|
|
return fmt.Errorf(ErrSessionIDCannotBeEmpty)
|
|
}
|
|
|
|
return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
|
|
request := &filer_pb.DeleteEntryRequest{
|
|
Directory: f.basePath,
|
|
Name: f.getSessionFileName(sessionId),
|
|
IsDeleteData: true,
|
|
IsRecursive: false,
|
|
IgnoreRecursiveError: false,
|
|
}
|
|
|
|
glog.V(3).Infof("Revoking session %s", sessionId)
|
|
resp, err := client.DeleteEntry(ctx, request)
|
|
if err != nil {
|
|
// Ignore "not found" errors - session may already be deleted
|
|
if strings.Contains(err.Error(), "not found") {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("failed to revoke session %s: %v", sessionId, err)
|
|
}
|
|
|
|
// Check response error
|
|
if resp.Error != "" {
|
|
// Ignore "not found" errors - session may already be deleted
|
|
if strings.Contains(resp.Error, "not found") {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("failed to revoke session %s: %s", sessionId, resp.Error)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// CleanupExpiredSessions removes expired sessions from filer
|
|
func (f *FilerSessionStore) CleanupExpiredSessions(ctx context.Context, filerAddress string) error {
|
|
if filerAddress == "" {
|
|
return fmt.Errorf(ErrFilerAddressRequired)
|
|
}
|
|
|
|
now := time.Now()
|
|
expiredCount := 0
|
|
|
|
err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
|
|
// List all entries in the session directory
|
|
request := &filer_pb.ListEntriesRequest{
|
|
Directory: f.basePath,
|
|
Prefix: "session_",
|
|
StartFromFileName: "",
|
|
InclusiveStartFrom: false,
|
|
Limit: 1000, // Process in batches of 1000
|
|
}
|
|
|
|
stream, err := client.ListEntries(ctx, request)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list sessions: %v", err)
|
|
}
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
break // End of stream or error
|
|
}
|
|
|
|
if resp.Entry == nil || resp.Entry.IsDirectory {
|
|
continue
|
|
}
|
|
|
|
// Parse session data to check expiration
|
|
var session SessionInfo
|
|
if err := json.Unmarshal(resp.Entry.Content, &session); err != nil {
|
|
glog.V(2).Infof("Failed to parse session file %s, deleting: %v", resp.Entry.Name, err)
|
|
// Delete corrupted session file
|
|
f.deleteSessionFile(ctx, client, resp.Entry.Name)
|
|
continue
|
|
}
|
|
|
|
// Check if session is expired
|
|
if now.After(session.ExpiresAt) {
|
|
glog.V(3).Infof("Cleaning up expired session: %s", resp.Entry.Name)
|
|
if err := f.deleteSessionFile(ctx, client, resp.Entry.Name); err != nil {
|
|
glog.V(1).Infof("Failed to delete expired session %s: %v", resp.Entry.Name, err)
|
|
} else {
|
|
expiredCount++
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if expiredCount > 0 {
|
|
glog.V(2).Infof("Cleaned up %d expired sessions", expiredCount)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Helper methods
|
|
|
|
// withFilerClient executes a function with a filer client
|
|
func (f *FilerSessionStore) withFilerClient(filerAddress string, fn func(client filer_pb.SeaweedFilerClient) error) error {
|
|
if filerAddress == "" {
|
|
return fmt.Errorf(ErrFilerAddressRequired)
|
|
}
|
|
|
|
// Use the pb.WithGrpcFilerClient helper similar to existing SeaweedFS code
|
|
return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddress), f.grpcDialOption, fn)
|
|
}
|
|
|
|
// getSessionPath returns the full path for a session
|
|
func (f *FilerSessionStore) getSessionPath(sessionId string) string {
|
|
return f.basePath + "/" + f.getSessionFileName(sessionId)
|
|
}
|
|
|
|
// getSessionFileName returns the filename for a session
|
|
func (f *FilerSessionStore) getSessionFileName(sessionId string) string {
|
|
return "session_" + sessionId + ".json"
|
|
}
|
|
|
|
// deleteSessionFile deletes a session file
|
|
func (f *FilerSessionStore) deleteSessionFile(ctx context.Context, client filer_pb.SeaweedFilerClient, fileName string) error {
|
|
request := &filer_pb.DeleteEntryRequest{
|
|
Directory: f.basePath,
|
|
Name: fileName,
|
|
IsDeleteData: true,
|
|
IsRecursive: false,
|
|
IgnoreRecursiveError: false,
|
|
}
|
|
|
|
_, err := client.DeleteEntry(ctx, request)
|
|
return err
|
|
}
|