You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
113 lines
3.1 KiB
113 lines
3.1 KiB
package broker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
|
|
"github.com/chrislusf/seaweedfs/weed/security"
|
|
"github.com/chrislusf/seaweedfs/weed/util"
|
|
)
|
|
|
|
func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
|
|
|
|
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
|
|
dir, name := util.FullPath(targetFile).DirAndName()
|
|
|
|
chunk := &filer_pb.FileChunk{
|
|
FileId: assignResult.Fid,
|
|
Offset: 0, // needs to be fixed during appending
|
|
Size: uint64(uploadResult.Size),
|
|
Mtime: time.Now().UnixNano(),
|
|
ETag: uploadResult.ETag,
|
|
IsGzipped: uploadResult.Gzip > 0,
|
|
}
|
|
|
|
// append the chunk
|
|
if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
request := &filer_pb.AppendToEntryRequest{
|
|
Directory: dir,
|
|
EntryName: name,
|
|
Chunks: []*filer_pb.FileChunk{chunk},
|
|
}
|
|
|
|
_, err := client.AppendToEntry(context.Background(), request)
|
|
if err != nil {
|
|
glog.V(0).Infof("append to file %v: %v", request, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return fmt.Errorf("append to file %v: %v", targetFile, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
|
|
|
|
var assignResult = &operation.AssignResult{}
|
|
|
|
// assign a volume location
|
|
if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
request := &filer_pb.AssignVolumeRequest{
|
|
Count: 1,
|
|
Replication: topicConfig.Replication,
|
|
Collection: topicConfig.Collection,
|
|
}
|
|
|
|
resp, err := client.AssignVolume(context.Background(), request)
|
|
if err != nil {
|
|
glog.V(0).Infof("assign volume failure %v: %v", request, err)
|
|
return err
|
|
}
|
|
if resp.Error != "" {
|
|
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
|
|
}
|
|
|
|
assignResult.Auth = security.EncodedJwt(resp.Auth)
|
|
assignResult.Fid = resp.FileId
|
|
assignResult.Url = resp.Url
|
|
assignResult.PublicUrl = resp.PublicUrl
|
|
assignResult.Count = uint64(resp.Count)
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// upload data
|
|
targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
|
|
uploadResult, err := operation.UploadData(targetUrl, "", broker.option.Cipher, data, false, "", nil, assignResult.Auth)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
|
|
}
|
|
// println("uploaded to", targetUrl)
|
|
return assignResult, uploadResult, nil
|
|
}
|
|
|
|
func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
|
|
|
|
for _, filer := range broker.option.Filers {
|
|
if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
|
|
glog.V(0).Infof("fail to connect to %s: %v", filer, err)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
return
|
|
|
|
}
|