You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
130 lines
3.5 KiB
130 lines
3.5 KiB
package broker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/chrislusf/seaweedfs/weed/security"
|
|
"io"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/chrislusf/seaweedfs/weed/util"
|
|
)
|
|
|
|
func (broker *MessageQueueBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error {
|
|
|
|
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
|
|
dir, name := util.FullPath(targetFile).DirAndName()
|
|
|
|
// append the chunk
|
|
if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
request := &filer_pb.AppendToEntryRequest{
|
|
Directory: dir,
|
|
EntryName: name,
|
|
Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
|
|
}
|
|
|
|
_, err := client.AppendToEntry(context.Background(), request)
|
|
if err != nil {
|
|
glog.V(0).Infof("append to file %v: %v", request, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return fmt.Errorf("append to file %v: %v", targetFile, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (broker *MessageQueueBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
|
|
|
|
var assignResult = &operation.AssignResult{}
|
|
|
|
// assign a volume location
|
|
if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
assignErr := util.Retry("assignVolume", func() error {
|
|
request := &filer_pb.AssignVolumeRequest{
|
|
Count: 1,
|
|
Replication: topicConfig.Replication,
|
|
Collection: topicConfig.Collection,
|
|
}
|
|
|
|
resp, err := client.AssignVolume(context.Background(), request)
|
|
if err != nil {
|
|
glog.V(0).Infof("assign volume failure %v: %v", request, err)
|
|
return err
|
|
}
|
|
if resp.Error != "" {
|
|
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
|
|
}
|
|
|
|
assignResult.Auth = security.EncodedJwt(resp.Auth)
|
|
assignResult.Fid = resp.FileId
|
|
assignResult.Url = resp.Location.Url
|
|
assignResult.PublicUrl = resp.Location.PublicUrl
|
|
assignResult.GrpcPort = int(resp.Location.GrpcPort)
|
|
assignResult.Count = uint64(resp.Count)
|
|
|
|
return nil
|
|
})
|
|
if assignErr != nil {
|
|
return assignErr
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// upload data
|
|
targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
|
|
uploadOption := &operation.UploadOption{
|
|
UploadUrl: targetUrl,
|
|
Filename: "",
|
|
Cipher: broker.option.Cipher,
|
|
IsInputCompressed: false,
|
|
MimeType: "",
|
|
PairMap: nil,
|
|
Jwt: assignResult.Auth,
|
|
}
|
|
uploadResult, err := operation.UploadData(data, uploadOption)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
|
|
}
|
|
// println("uploaded to", targetUrl)
|
|
return assignResult, uploadResult, nil
|
|
}
|
|
|
|
var _ = filer_pb.FilerClient(&MessageQueueBroker{})
|
|
|
|
func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
|
|
|
|
for _, filer := range broker.option.Filers {
|
|
if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
glog.V(0).Infof("fail to connect to %s: %v", filer, err)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
|
|
return location.Url
|
|
}
|