Chris Lu
3 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 286 additions and 52 deletions
-
4docker/compose/fluent.json
-
20docker/compose/local-auditlog-compose.yml
-
3go.mod
-
7go.sum
-
1weed/command/filer.go
-
7weed/command/s3.go
-
1weed/command/server.go
-
2weed/s3api/auth_credentials.go
-
3weed/s3api/chunked_reader_v4.go
-
17weed/s3api/http/header.go
-
11weed/s3api/s3api_bucket_handlers.go
-
1weed/s3api/s3api_handlers.go
-
5weed/s3api/s3api_object_copy_handlers.go
-
54weed/s3api/s3api_object_handlers.go
-
1weed/s3api/s3api_object_handlers_postpolicy.go
-
13weed/s3api/s3api_object_multipart_handlers.go
-
10weed/s3api/s3api_object_tagging_handlers.go
-
4weed/s3api/s3api_objects_list_handlers.go
-
2weed/s3api/s3api_server.go
-
170weed/s3api/s3err/audit_fluent.go
-
2weed/s3api/s3err/error_handler.go
@ -0,0 +1,4 @@ |
|||
{ |
|||
"fluent_port": 24224, |
|||
"fluent_host": "fluent" |
|||
} |
@ -0,0 +1,20 @@ |
|||
version: '2' |
|||
|
|||
services: |
|||
server: |
|||
image: chrislusf/seaweedfs:local |
|||
ports: |
|||
- 8333:8333 |
|||
- 9333:9333 |
|||
- 19333:19333 |
|||
- 8084:8080 |
|||
- 18084:18080 |
|||
- 8888:8888 |
|||
- 18888:18888 |
|||
command: "server -ip=server -filer -s3 -s3.auditLogConfig=/etc/seaweedfs/fluent.json -volume.max=0 -master.volumeSizeLimitMB=8 -volume.preStopSeconds=1" |
|||
volumes: |
|||
- ./fluent.json:/etc/seaweedfs/fluent.json |
|||
fluent: |
|||
image: fluent/fluentd:v1.14 |
|||
ports: |
|||
- 24224:24224 |
@ -0,0 +1,170 @@ |
|||
package s3err |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" |
|||
"github.com/fluent/fluent-logger-golang/fluent" |
|||
"net/http" |
|||
"os" |
|||
"time" |
|||
) |
|||
|
|||
type AccessLogExtend struct { |
|||
AccessLog |
|||
AccessLogHTTP |
|||
} |
|||
|
|||
type AccessLog struct { |
|||
Bucket string `msg:"bucket" json:"bucket"` // awsexamplebucket1
|
|||
Time int64 `msg:"time" json:"time"` // [06/Feb/2019:00:00:38 +0000]
|
|||
RemoteIP string `msg:"remote_ip" json:"remote_ip,omitempty"` // 192.0.2.3
|
|||
Requester string `msg:"requester" json:"requester,omitempty"` // IAM user id
|
|||
RequestID string `msg:"request_id" json:"request_id,omitempty"` // 3E57427F33A59F07
|
|||
Operation string `msg:"operation" json:"operation,omitempty"` // REST.HTTP_method.resource_type REST.PUT.OBJECT
|
|||
Key string `msg:"key" json:"key,omitempty"` // /photos/2019/08/puppy.jpg
|
|||
ErrorCode string `msg:"error_code" json:"error_code,omitempty"` |
|||
HostId string `msg:"host_id" json:"host_id,omitempty"` |
|||
HostHeader string `msg:"host_header" json:"host_header,omitempty"` // s3.us-west-2.amazonaws.com
|
|||
UserAgent string `msg:"user_agent" json:"user_agent,omitempty"` |
|||
HTTPStatus int `msg:"status" json:"status,omitempty"` |
|||
SignatureVersion string `msg:"signature_version" json:"signature_version,omitempty"` |
|||
} |
|||
|
|||
type AccessLogHTTP struct { |
|||
RequestURI string `json:"request_uri,omitempty"` // "GET /awsexamplebucket1/photos/2019/08/puppy.jpg?x-foo=bar HTTP/1.1"
|
|||
BytesSent string `json:"bytes_sent,omitempty"` |
|||
ObjectSize string `json:"object_size,omitempty"` |
|||
TotalTime int `json:"total_time,omitempty"` |
|||
TurnAroundTime int `json:"turn_around_time,omitempty"` |
|||
Referer string `json:"Referer,omitempty"` |
|||
VersionId string `json:"version_id,omitempty"` |
|||
CipherSuite string `json:"cipher_suite,omitempty"` |
|||
AuthenticationType string `json:"auth_type,omitempty"` |
|||
TLSVersion string `json:"TLS_version,omitempty"` |
|||
} |
|||
|
|||
const tag = "s3.access" |
|||
|
|||
var ( |
|||
Logger *fluent.Fluent |
|||
hostname = os.Getenv("HOSTNAME") |
|||
) |
|||
|
|||
func InitAuditLog(config string) { |
|||
configContent, readErr := os.ReadFile(config) |
|||
if readErr != nil { |
|||
glog.Fatalf("fail to read fluent config %s : %v", config, readErr) |
|||
} |
|||
var fluentConfig fluent.Config |
|||
if err := json.Unmarshal(configContent, &fluentConfig); err != nil { |
|||
glog.Fatalf("fail to parse fluent config %s : %v", config, err) |
|||
} |
|||
var err error |
|||
Logger, err = fluent.New(fluentConfig) |
|||
if err != nil { |
|||
glog.Fatalf("fail to load fluent config: %v", err) |
|||
} |
|||
} |
|||
|
|||
func getREST(httpMetod string, resourceType string) string { |
|||
return fmt.Sprintf("REST.%s.%s", httpMetod, resourceType) |
|||
} |
|||
|
|||
func getResourceType(object string, query_key string, metod string) (string, bool) { |
|||
if object == "/" { |
|||
switch query_key { |
|||
case "delete": |
|||
return "BATCH.DELETE.OBJECT", true |
|||
case "tagging": |
|||
return getREST(metod, "OBJECTTAGGING"), true |
|||
case "lifecycle": |
|||
return getREST(metod, "LIFECYCLECONFIGURATION"), true |
|||
case "acl": |
|||
return getREST(metod, "ACCESSCONTROLPOLICY"), true |
|||
case "policy": |
|||
return getREST(metod, "BUCKETPOLICY"), true |
|||
default: |
|||
return getREST(metod, "BUCKET"), false |
|||
} |
|||
} else { |
|||
switch query_key { |
|||
case "tagging": |
|||
return getREST(metod, "OBJECTTAGGING"), true |
|||
default: |
|||
return getREST(metod, "OBJECT"), false |
|||
} |
|||
} |
|||
} |
|||
|
|||
func getOperation(object string, r *http.Request) string { |
|||
queries := r.URL.Query() |
|||
var operation string |
|||
var queryFound bool |
|||
for key, _ := range queries { |
|||
operation, queryFound = getResourceType(object, key, r.Method) |
|||
if queryFound { |
|||
return operation |
|||
} |
|||
} |
|||
if len(queries) == 0 { |
|||
operation, _ = getResourceType(object, "", r.Method) |
|||
} |
|||
return operation |
|||
} |
|||
|
|||
func GetAccessHttpLog(r *http.Request, statusCode int, s3errCode ErrorCode) AccessLogHTTP { |
|||
return AccessLogHTTP{ |
|||
RequestURI: r.RequestURI, |
|||
Referer: r.Header.Get("Referer"), |
|||
} |
|||
} |
|||
|
|||
func GetAccessLog(r *http.Request, HTTPStatusCode int, s3errCode ErrorCode) *AccessLog { |
|||
bucket, key := xhttp.GetBucketAndObject(r) |
|||
var errorCode string |
|||
if s3errCode != ErrNone { |
|||
errorCode = GetAPIError(s3errCode).Code |
|||
} |
|||
remoteIP := r.Header.Get("X-Real-IP") |
|||
if len(remoteIP) == 0 { |
|||
remoteIP = r.RemoteAddr |
|||
} |
|||
hostHeader := r.Header.Get("Host") |
|||
if len(hostHeader) == 0 { |
|||
hostHeader = r.URL.Hostname() |
|||
} |
|||
return &AccessLog{ |
|||
HostHeader: hostHeader, |
|||
RequestID: r.Header.Get("X-Request-ID"), |
|||
RemoteIP: remoteIP, |
|||
Requester: r.Header.Get(xhttp.AmzIdentityId), |
|||
UserAgent: r.Header.Get("UserAgent"), |
|||
HostId: hostname, |
|||
Bucket: bucket, |
|||
HTTPStatus: HTTPStatusCode, |
|||
Time: time.Now().Unix(), |
|||
Key: key, |
|||
Operation: getOperation(key, r), |
|||
ErrorCode: errorCode, |
|||
} |
|||
} |
|||
|
|||
func PostLog(r *http.Request, HTTPStatusCode int, errorCode ErrorCode) { |
|||
if Logger == nil { |
|||
return |
|||
} |
|||
if err := Logger.Post(tag, *GetAccessLog(r, HTTPStatusCode, errorCode)); err != nil { |
|||
glog.Warning("Error while posting log: ", err) |
|||
} |
|||
} |
|||
|
|||
func PostAccessLog(log *AccessLog) { |
|||
if Logger == nil || log == nil { |
|||
return |
|||
} |
|||
if err := Logger.Post(tag, *log); err != nil { |
|||
glog.Warning("Error while posting log: ", err) |
|||
} |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue