|
@ -25,6 +25,7 @@ type S3Sink struct { |
|
|
region string |
|
|
region string |
|
|
bucket string |
|
|
bucket string |
|
|
dir string |
|
|
dir string |
|
|
|
|
|
endpoint string |
|
|
filerSource *source.FilerSource |
|
|
filerSource *source.FilerSource |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -44,12 +45,14 @@ func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string |
|
|
glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) |
|
|
glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) |
|
|
glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) |
|
|
glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) |
|
|
glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) |
|
|
glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) |
|
|
|
|
|
glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) |
|
|
return s3sink.initialize( |
|
|
return s3sink.initialize( |
|
|
configuration.GetString(prefix+"aws_access_key_id"), |
|
|
configuration.GetString(prefix+"aws_access_key_id"), |
|
|
configuration.GetString(prefix+"aws_secret_access_key"), |
|
|
configuration.GetString(prefix+"aws_secret_access_key"), |
|
|
configuration.GetString(prefix+"region"), |
|
|
configuration.GetString(prefix+"region"), |
|
|
configuration.GetString(prefix+"bucket"), |
|
|
configuration.GetString(prefix+"bucket"), |
|
|
configuration.GetString(prefix+"directory"), |
|
|
configuration.GetString(prefix+"directory"), |
|
|
|
|
|
configuration.GetString(prefix+"endpoint"), |
|
|
) |
|
|
) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -57,13 +60,15 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) { |
|
|
s3sink.filerSource = s |
|
|
s3sink.filerSource = s |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string) error { |
|
|
|
|
|
|
|
|
func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir, endpoint string) error { |
|
|
s3sink.region = region |
|
|
s3sink.region = region |
|
|
s3sink.bucket = bucket |
|
|
s3sink.bucket = bucket |
|
|
s3sink.dir = dir |
|
|
s3sink.dir = dir |
|
|
|
|
|
s3sink.endpoint = endpoint |
|
|
|
|
|
|
|
|
config := &aws.Config{ |
|
|
config := &aws.Config{ |
|
|
Region: aws.String(s3sink.region), |
|
|
|
|
|
|
|
|
Region: aws.String(s3sink.region), |
|
|
|
|
|
Endpoint: aws.String(s3sink.endpoint), |
|
|
} |
|
|
} |
|
|
if awsAccessKeyId != "" && awsSecretAccessKey != "" { |
|
|
if awsAccessKeyId != "" && awsSecretAccessKey != "" { |
|
|
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") |
|
|
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") |
|
|