|
|
@ -26,6 +26,7 @@ type S3Sink struct { |
|
|
|
bucket string |
|
|
|
dir string |
|
|
|
endpoint string |
|
|
|
acl string |
|
|
|
filerSource *source.FilerSource |
|
|
|
isIncremental bool |
|
|
|
} |
|
|
@ -51,6 +52,7 @@ func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string |
|
|
|
glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) |
|
|
|
glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) |
|
|
|
glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) |
|
|
|
glog.V(0).Infof("sink.s3.acl: %v", configuration.GetString(prefix+"acl")) |
|
|
|
glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental")) |
|
|
|
s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental") |
|
|
|
return s3sink.initialize( |
|
|
@ -60,6 +62,7 @@ func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string |
|
|
|
configuration.GetString(prefix+"bucket"), |
|
|
|
configuration.GetString(prefix+"directory"), |
|
|
|
configuration.GetString(prefix+"endpoint"), |
|
|
|
configuration.GetString(prefix+"acl"), |
|
|
|
) |
|
|
|
} |
|
|
|
|
|
|
@ -67,11 +70,12 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) { |
|
|
|
s3sink.filerSource = s |
|
|
|
} |
|
|
|
|
|
|
|
func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir, endpoint string) error { |
|
|
|
func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir, endpoint, acl string) error { |
|
|
|
s3sink.region = region |
|
|
|
s3sink.bucket = bucket |
|
|
|
s3sink.dir = dir |
|
|
|
s3sink.endpoint = endpoint |
|
|
|
s3sink.acl = acl |
|
|
|
|
|
|
|
config := &aws.Config{ |
|
|
|
Region: aws.String(s3sink.region), |
|
|
|