Browse Source

also migrate jsonpb

pull/3461/head
chrislu 2 years ago
parent
commit
2b580a7566
  1. 7
      weed/command/filer_meta_tail.go
  2. 2
      weed/filer/entry_codec.go
  3. 12
      weed/filer/filer_conf.go
  4. 21
      weed/filer/s3iam_conf.go
  5. 4
      weed/mq/broker/broker_segment_serde.go
  6. 7
      weed/notification/aws_sqs/aws_sqs_pub.go
  7. 2
      weed/pb/proto_read_write_test.go
  8. 2
      weed/replication/sub/notification_aws_sqs.go
  9. 2
      weed/s3api/auth_credentials_test.go
  10. 20
      weed/shell/command_fs_meta_cat.go
  11. 10
      weed/shell/command_remote_configure.go
  12. 13
      weed/shell/command_remote_mount.go
  13. 13
      weed/storage/volume_info/volume_info.go
  14. 2
      weed/util/skiplist/Makefile
  15. 11
      weed/util/skiplist/skiplist.pb.go

7
weed/command/filer_meta_tail.go

@ -2,8 +2,8 @@ package command
import ( import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/protobuf/jsonpb"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -88,11 +88,8 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
return false return false
} }
jsonpbMarshaler := jsonpb.Marshaler{
EmitDefaults: false,
}
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
jsonpbMarshaler.Marshal(os.Stdout, resp)
filer.ProtoToText(os.Stdout, resp)
fmt.Fprintln(os.Stdout) fmt.Fprintln(os.Stdout)
return nil return nil
} }

2
weed/filer/entry_codec.go

@ -21,7 +21,7 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
message := &filer_pb.Entry{} message := &filer_pb.Entry{}
if err := proto.UnmarshalMerge(blob, message); err != nil {
if err := proto.Unmarshal(blob, message); err != nil {
return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err) return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err)
} }

12
weed/filer/filer_conf.go

@ -13,7 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/viant/ptrie" "github.com/viant/ptrie"
"google.golang.org/protobuf/jsonpb"
jsonpb "google.golang.org/protobuf/encoding/protojson"
) )
const ( const (
@ -93,7 +93,7 @@ func (fc *FilerConf) loadFromChunks(filer *Filer, content []byte, chunks []*file
func (fc *FilerConf) LoadFromBytes(data []byte) (err error) { func (fc *FilerConf) LoadFromBytes(data []byte) (err error) {
conf := &filer_pb.FilerConf{} conf := &filer_pb.FilerConf{}
if err := jsonpb.Unmarshal(bytes.NewReader(data), conf); err != nil {
if err := jsonpb.Unmarshal(data, conf); err != nil {
return err return err
} }
@ -181,11 +181,5 @@ func (fc *FilerConf) ToProto() *filer_pb.FilerConf {
} }
func (fc *FilerConf) ToText(writer io.Writer) error { func (fc *FilerConf) ToText(writer io.Writer) error {
m := jsonpb.Marshaler{
EmitDefaults: false,
Indent: " ",
}
return m.Marshal(writer, fc.ToProto())
return ProtoToText(writer, fc.ToProto())
} }

21
weed/filer/s3iam_conf.go

@ -1,17 +1,16 @@
package filer package filer
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"google.golang.org/protobuf/jsonpb"
jsonpb "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error { func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error {
if err := jsonpb.Unmarshal(bytes.NewBuffer(content), config); err != nil {
if err := jsonpb.Unmarshal(content, config); err != nil {
return err return err
} }
return nil return nil
@ -19,12 +18,22 @@ func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) er
func ProtoToText(writer io.Writer, config proto.Message) error { func ProtoToText(writer io.Writer, config proto.Message) error {
m := jsonpb.Marshaler{
EmitDefaults: false,
m := jsonpb.MarshalOptions{
EmitUnpopulated: true,
Indent: " ", Indent: " ",
} }
return m.Marshal(writer, config)
text, marshalErr := m.Marshal(config)
if marshalErr != nil {
return fmt.Errorf("marshal proto message: %v", marshalErr)
}
_, writeErr := writer.Write(text)
if writeErr != nil {
return fmt.Errorf("fail to write proto message: %v", writeErr)
}
return writeErr
} }
// CheckDuplicateAccessKey returns an error message when s3cfg has duplicate access keys // CheckDuplicateAccessKey returns an error message when s3cfg has duplicate access keys

4
weed/mq/broker/broker_segment_serde.go

@ -8,7 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/protobuf/jsonpb"
jsonpb "google.golang.org/protobuf/encoding/protojson"
"time" "time"
) )
@ -60,7 +60,7 @@ func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info
// parse into filer conf object // parse into filer conf object
info = &mq_pb.SegmentInfo{} info = &mq_pb.SegmentInfo{}
if err = jsonpb.Unmarshal(bytes.NewReader(data), info); err != nil {
if err = jsonpb.Unmarshal(data, info); err != nil {
return err return err
} }
found = true found = true

7
weed/notification/aws_sqs/aws_sqs_pub.go

@ -70,7 +70,10 @@ func (k *AwsSqsPub) initialize(awsAccessKeyId, awsSecretAccessKey, region, queue
func (k *AwsSqsPub) SendMessage(key string, message proto.Message) (err error) { func (k *AwsSqsPub) SendMessage(key string, message proto.Message) (err error) {
text := proto.MarshalTextString(message)
text, err := proto.Marshal(message)
if err != nil {
return fmt.Errorf("send message marshal %+v: %v", message, err)
}
_, err = k.svc.SendMessage(&sqs.SendMessageInput{ _, err = k.svc.SendMessage(&sqs.SendMessageInput{
DelaySeconds: aws.Int64(10), DelaySeconds: aws.Int64(10),
@ -80,7 +83,7 @@ func (k *AwsSqsPub) SendMessage(key string, message proto.Message) (err error) {
StringValue: aws.String(key), StringValue: aws.String(key),
}, },
}, },
MessageBody: aws.String(text),
MessageBody: aws.String(string(text)),
QueueUrl: &k.queueUrl, QueueUrl: &k.queueUrl,
}) })

2
weed/pb/proto_read_write_test.go

@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/protobuf/jsonpb"
jsonpb "google.golang.org/protobuf/encoding/protojson"
) )
func TestJsonpMarshalUnmarshal(t *testing.T) { func TestJsonpMarshalUnmarshal(t *testing.T) {

2
weed/replication/sub/notification_aws_sqs.go

@ -98,7 +98,7 @@ func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotif
key = *keyValue.StringValue key = *keyValue.StringValue
text := *result.Messages[0].Body text := *result.Messages[0].Body
message = &filer_pb.EventNotification{} message = &filer_pb.EventNotification{}
err = proto.UnmarshalText(text, message)
err = proto.Unmarshal([]byte(text), message)
// delete the message // delete the message
_, err = k.svc.DeleteMessage(&sqs.DeleteMessageInput{ _, err = k.svc.DeleteMessage(&sqs.DeleteMessageInput{

2
weed/s3api/auth_credentials_test.go

@ -5,7 +5,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"testing" "testing"
"google.golang.org/protobuf/jsonpb"
jsonpb "google.golang.org/protobuf/encoding/protojson"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
) )

20
weed/shell/command_fs_meta_cat.go

@ -2,8 +2,7 @@ package shell
import ( import (
"fmt" "fmt"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/jsonpb"
"github.com/seaweedfs/seaweedfs/weed/filer"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"io" "io"
@ -50,22 +49,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
return err return err
} }
m := jsonpb.Marshaler{
EmitDefaults: true,
Indent: " ",
}
slices.SortFunc(respLookupEntry.Entry.Chunks, func(a, b *filer_pb.FileChunk) bool {
if a.Offset == b.Offset {
return a.Mtime < b.Mtime
}
return a.Offset < b.Offset
})
text, marshalErr := m.MarshalToString(respLookupEntry.Entry)
if marshalErr != nil {
return fmt.Errorf("marshal meta: %v", marshalErr)
}
fmt.Fprintf(writer, "%s\n", text)
filer.ProtoToText(writer, respLookupEntry.Entry)
bytes, _ := proto.Marshal(respLookupEntry.Entry) bytes, _ := proto.Marshal(respLookupEntry.Entry)
gzippedBytes, _ := util.GzipData(bytes) gzippedBytes, _ := util.GzipData(bytes)

10
weed/shell/command_remote_configure.go

@ -9,7 +9,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage" "github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/jsonpb"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"io" "io"
"regexp" "regexp"
@ -159,15 +158,8 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE
conf.TencentSecretKey = strings.Repeat("*", len(conf.TencentSecretKey)) conf.TencentSecretKey = strings.Repeat("*", len(conf.TencentSecretKey))
conf.WasabiSecretKey = strings.Repeat("*", len(conf.WasabiSecretKey)) conf.WasabiSecretKey = strings.Repeat("*", len(conf.WasabiSecretKey))
m := jsonpb.Marshaler{
EmitDefaults: false,
Indent: " ",
}
err := m.Marshal(writer, conf)
fmt.Fprintln(writer)
return filer.ProtoToText(writer, conf)
return err
}) })
} }

13
weed/shell/command_remote_mount.go

@ -9,7 +9,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage" "github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/jsonpb"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"io" "io"
"os" "os"
@ -101,17 +100,7 @@ func listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (
} }
func jsonPrintln(writer io.Writer, message proto.Message) error { func jsonPrintln(writer io.Writer, message proto.Message) error {
if message == nil {
return nil
}
m := jsonpb.Marshaler{
EmitDefaults: false,
Indent: " ",
}
err := m.Marshal(writer, message)
fmt.Fprintln(writer)
return err
return filer.ProtoToText(writer, message)
} }
func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *remote_pb.RemoteConf, remote *remote_pb.RemoteStorageLocation) error { func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *remote_pb.RemoteConf, remote *remote_pb.RemoteStorageLocation) error {

13
weed/storage/volume_info/volume_info.go

@ -1,11 +1,10 @@
package volume_info package volume_info
import ( import (
"bytes"
"fmt" "fmt"
"os" "os"
"google.golang.org/protobuf/jsonpb"
jsonpb "google.golang.org/protobuf/encoding/protojson"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@ -44,7 +43,7 @@ func MaybeLoadVolumeInfo(fileName string) (volumeInfo *volume_server_pb.VolumeIn
} }
glog.V(1).Infof("maybeLoadVolumeInfo Unmarshal volume info %v", fileName) glog.V(1).Infof("maybeLoadVolumeInfo Unmarshal volume info %v", fileName)
if err = jsonpb.Unmarshal(bytes.NewReader(tierData), volumeInfo); err != nil {
if err = jsonpb.Unmarshal(tierData, volumeInfo); err != nil {
glog.Warningf("unmarshal error: %v", err) glog.Warningf("unmarshal error: %v", err)
err = fmt.Errorf("unmarshal error: %v", err) err = fmt.Errorf("unmarshal error: %v", err)
return return
@ -65,17 +64,17 @@ func SaveVolumeInfo(fileName string, volumeInfo *volume_server_pb.VolumeInfo) er
return fmt.Errorf("%s not writable", fileName) return fmt.Errorf("%s not writable", fileName)
} }
m := jsonpb.Marshaler{
EmitDefaults: true,
m := jsonpb.MarshalOptions{
EmitUnpopulated: true,
Indent: " ", Indent: " ",
} }
text, marshalErr := m.MarshalToString(volumeInfo)
text, marshalErr := m.Marshal(volumeInfo)
if marshalErr != nil { if marshalErr != nil {
return fmt.Errorf("marshal to %s: %v", fileName, marshalErr) return fmt.Errorf("marshal to %s: %v", fileName, marshalErr)
} }
writeErr := util.WriteFile(fileName, []byte(text), 0755)
writeErr := util.WriteFile(fileName, text, 0755)
if writeErr != nil { if writeErr != nil {
return fmt.Errorf("fail to write %s : %v", fileName, writeErr) return fmt.Errorf("fail to write %s : %v", fileName, writeErr)
} }

2
weed/util/skiplist/Makefile

@ -3,4 +3,4 @@ all: gen
.PHONY : gen .PHONY : gen
gen: gen:
protoc skiplist.proto --go_out=plugins=grpc:. --go_opt=paths=source_relative
protoc skiplist.proto --go_out=. --go-grpc_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative

11
weed/util/skiplist/skiplist.pb.go

@ -1,13 +1,12 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.25.0
// protoc v3.12.3
// protoc-gen-go v1.26.0
// protoc v3.17.3
// source: skiplist.proto // source: skiplist.proto
package skiplist package skiplist
import ( import (
proto "google.golang.org/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect" reflect "reflect"
@ -21,10 +20,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
) )
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type SkipListProto struct { type SkipListProto struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -325,7 +320,7 @@ var file_skiplist_proto_rawDesc = []byte{
0x22, 0x25, 0x0a, 0x0d, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74, 0x22, 0x25, 0x0a, 0x0d, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74,
0x61, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, 0x61, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c,
0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75,
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f,
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f,
0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75,
0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72, 0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33, 0x6f, 0x74, 0x6f, 0x33,

Loading…
Cancel
Save