gRPCを使いそうなので記事にまとめておきます。
ちょっとだけ長いので必要なところだけ読めばいいと思います。 あとスマホで見ようとすると重いのでPCで見ることをおすすめします。
今回使うコードはリポジトリに置いてあります。 https://github.com/righ/grpc-go-example/
これはgrpc/grpc-go/ の examples をベースに修正と追記をしたものです。
clone して docker-compose すればコンテナが2つ起動します。
$ git clone https://github.com/righ/grpc-go-example/
$ cd grpc-go-example/
$ docker-compose up
Creating network "grpc_default" with the default driver
Creating go2 ... done
Creating go1 ... done
Attaching to go2, go1
以下の内容はすべてこれらのコンテナが起動している前提なので、手元で試したい方は clone してください。
🌀 とりあえず使ってみる
起動した2つのコンテナ(grpc-server,grpc-client)に入って プログラムをインストールします。
インストールしたプログラムは $GOPATH/bin/
に配置されるのでそのまま実行できます。
- grpc-server
- grpc-client
-
$ docker exec -it grpc-server /bin/bash root@82769e87d925:~# go get -u github.com/righ/grpc-go-example/helloworld/greeter_server root@82769e87d925:~# greeter_server 2020/02/01 12:34:44 Received: world 2020/02/01 12:34:51 Received: world
-
$ docker exec -it grpc-client /bin/bash root@189290506d75:~# go get -u github.com/righ/grpc-go-example/helloworld/greeter_client root@189290506d75:~# greeter_client 2020/02/01 12:34:44 Greeting: Hello world root@189290506d75:~# greeter_client 2020/02/01 12:34:51 Greeting: Hello world
クライアントプログラムを実行するたびに以下の処理が実行されているだけです。
- リクエストを受け取ったサーバはその内容を表示
- クライアントは返却されたレスポンスを表示
- info
- 以降のプログラムも、サーバは
grpc-server
コンテナ、 クライアントはgrpc-client
コンテナで動くことを前提に書いています。
- 以降のプログラムも、サーバは
この程度の通信プログラムくらいなら自力でも簡単に書くことができますよね。
何故 gRPC を使うと嬉しいのか。 gRPC 自体が高速だというのもあるのですが、別の理由としては Protocol Buffer の存在が大きいです。
📖 Protocol Buffer
まずは簡単な説明を。
Protocol Buffers(プロトコルバッファー)はインタフェース定義言語 (IDL) で構造を定義する通信や永続化での利用を目的としたシリアライズフォーマットであり、Googleにより開発されている。
gRPCはこの Protocol Buffer で定義された構造を使って通信を行います。
定義は下記(左)のようなテキストファイルです(拡張子を proto
とするのが慣例)
-
helloworld.proto
syntax = "proto3"; option java_multiple_files = true; option java_package = "io.grpc.examples.helloworld"; option java_outer_classname = "HelloWorldProto"; package helloworld; // The greeting service definition. service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
-
helloworld.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT. // source: helloworld.proto package helloworld import ( context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" math "math" ) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package // The request message containing the user's name. type HelloRequest struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *HelloRequest) Reset() { *m = HelloRequest{} } func (m *HelloRequest) String() string { return proto.CompactTextString(m) } func (*HelloRequest) ProtoMessage() {} func (*HelloRequest) Descriptor() ([]byte, []int) { return fileDescriptor_17b8c58d586b62f2, []int{0} } func (m *HelloRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_HelloRequest.Unmarshal(m, b) } func (m *HelloRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_HelloRequest.Marshal(b, m, deterministic) } func (m *HelloRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_HelloRequest.Merge(m, src) } func (m *HelloRequest) XXX_Size() int { return xxx_messageInfo_HelloRequest.Size(m) } func (m *HelloRequest) XXX_DiscardUnknown() { xxx_messageInfo_HelloRequest.DiscardUnknown(m) } var xxx_messageInfo_HelloRequest proto.InternalMessageInfo func (m *HelloRequest) GetName() string { if m != nil { return m.Name } return "" } // The response message containing the greetings type HelloReply struct { Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *HelloReply) Reset() { *m = HelloReply{} } func (m *HelloReply) String() string { return proto.CompactTextString(m) } func (*HelloReply) ProtoMessage() {} func (*HelloReply) Descriptor() ([]byte, []int) { return fileDescriptor_17b8c58d586b62f2, []int{1} } func (m *HelloReply) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_HelloReply.Unmarshal(m, b) } func (m *HelloReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_HelloReply.Marshal(b, m, deterministic) } func (m *HelloReply) XXX_Merge(src proto.Message) { xxx_messageInfo_HelloReply.Merge(m, src) } func (m *HelloReply) XXX_Size() int { return xxx_messageInfo_HelloReply.Size(m) } func (m *HelloReply) XXX_DiscardUnknown() { xxx_messageInfo_HelloReply.DiscardUnknown(m) } var xxx_messageInfo_HelloReply proto.InternalMessageInfo func (m *HelloReply) GetMessage() string { if m != nil { return m.Message } return "" } func init() { proto.RegisterType((*HelloRequest)(nil), "helloworld.HelloRequest") proto.RegisterType((*HelloReply)(nil), "helloworld.HelloReply") } func init() { proto.RegisterFile("helloworld.proto", fileDescriptor_17b8c58d586b62f2) } var fileDescriptor_17b8c58d586b62f2 = []byte{ // 175 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0xcd, 0xc9, 0xc9, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88, 0x28, 0x29, 0x71, 0xf1, 0x78, 0x80, 0x78, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x42, 0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0xb6, 0x92, 0x1a, 0x17, 0x17, 0x54, 0x4d, 0x41, 0x4e, 0xa5, 0x90, 0x04, 0x17, 0x7b, 0x6e, 0x6a, 0x71, 0x71, 0x62, 0x3a, 0x4c, 0x11, 0x8c, 0x6b, 0xe4, 0xc9, 0xc5, 0xee, 0x5e, 0x94, 0x9a, 0x5a, 0x92, 0x5a, 0x24, 0x64, 0xc7, 0xc5, 0x11, 0x9c, 0x58, 0x09, 0xd6, 0x25, 0x24, 0xa1, 0x87, 0xe4, 0x02, 0x64, 0xcb, 0xa4, 0xc4, 0xb0, 0xc8, 0x14, 0xe4, 0x54, 0x2a, 0x31, 0x38, 0x19, 0x70, 0x49, 0x67, 0xe6, 0xeb, 0xa5, 0x17, 0x15, 0x24, 0xeb, 0xa5, 0x56, 0x24, 0xe6, 0x16, 0xe4, 0xa4, 0x16, 0x23, 0xa9, 0x75, 0xe2, 0x07, 0x2b, 0x0e, 0x07, 0xb1, 0x03, 0x40, 0x5e, 0x0a, 0x60, 0x4c, 0x62, 0x03, 0xfb, 0xcd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x0f, 0xb7, 0xcd, 0xf2, 0xef, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConnInterface // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 // GreeterClient is the client API for Greeter service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type GreeterClient interface { // Sends a greeting SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) } type greeterClient struct { cc grpc.ClientConnInterface } func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient { return &greeterClient{cc} } func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { out := new(HelloReply) err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...) if err != nil { return nil, err } return out, nil } // GreeterServer is the server API for Greeter service. type GreeterServer interface { // Sends a greeting SayHello(context.Context, *HelloRequest) (*HelloReply, error) } // UnimplementedGreeterServer can be embedded to have forward compatible implementations. type UnimplementedGreeterServer struct { } func (*UnimplementedGreeterServer) SayHello(ctx context.Context, req *HelloRequest) (*HelloReply, error) { return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented") } func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv) } func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(HelloRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(GreeterServer).SayHello(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/helloworld.Greeter/SayHello", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) } return interceptor(ctx, in, info, handler) } var _Greeter_serviceDesc = grpc.ServiceDesc{ ServiceName: "helloworld.Greeter", HandlerType: (*GreeterServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "SayHello", Handler: _Greeter_SayHello_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "helloworld.proto", }
この定義をもとにコードを自動生成(上記右)して、自動生成されたコード(パラメータなど)を使って実装することで、定義した構造を用いて通信できるというわけです。
このコードを自動生成するのが protoc
コマンドです。
Macであれば brew を使ってインストールすることもできますが、 それ以外の環境を利用する場合は Releases からダウンロードしてきましょう
今回は 簡単なスクリプト を使ってインストールします。これで protoc コマンドが使えるようになります。
root@2b588693c971:~# ./protoc_install.sh
root@2b588693c971:~# protoc --version
libprotoc 3.11.2
# protoファイルが置いてあるディレクトリに移動
root@2b588693c971:~# cd helloworld/helloworld
root@2b588693c971:~/helloworld/helloworld# protoc --go_out=plugins=grpc:. helloworld.proto
# 指定したディレクトリに `ファイル名.pb.go` が作られる
root@2b588693c971:~/helloworld/helloworld# ls
helloworld.pb.go helloworld.proto
# オプション
root@2b588693c971:~/helloworld/helloworld# protoc -help
Usage: protoc [OPTION] PROTO_FILES
Parse PROTO_FILES and generate output based on the options given:
-IPATH, --proto_path=PATH Specify the directory in which to search for
imports. May be specified multiple times;
directories will be searched in order. If not
given, the current working directory is used.
If not found in any of the these directories,
the --descriptor_set_in descriptors will be
checked for required proto file.
--version Show version info and exit.
-h, --help Show this text and exit.
--encode=MESSAGE_TYPE Read a text-format message of the given type
from standard input and write it in binary
to standard output. The message type must
be defined in PROTO_FILES or their imports.
--decode=MESSAGE_TYPE Read a binary message of the given type from
standard input and write it in text format
to standard output. The message type must
be defined in PROTO_FILES or their imports.
--decode_raw Read an arbitrary protocol message from
standard input and write the raw tag/value
pairs in text format to standard output. No
PROTO_FILES should be given when using this
flag.
--descriptor_set_in=FILES Specifies a delimited list of FILES
each containing a FileDescriptorSet (a
protocol buffer defined in descriptor.proto).
The FileDescriptor for each of the PROTO_FILES
provided will be loaded from these
FileDescriptorSets. If a FileDescriptor
appears multiple times, the first occurrence
will be used.
-oFILE, Writes a FileDescriptorSet (a protocol buffer,
--descriptor_set_out=FILE defined in descriptor.proto) containing all of
the input files to FILE.
--include_imports When using --descriptor_set_out, also include
all dependencies of the input files in the
set, so that the set is self-contained.
--include_source_info When using --descriptor_set_out, do not strip
SourceCodeInfo from the FileDescriptorProto.
This results in vastly larger descriptors that
include information about the original
location of each decl in the source file as
well as surrounding comments.
--dependency_out=FILE Write a dependency output file in the format
expected by make. This writes the transitive
set of input file paths to FILE
--error_format=FORMAT Set the format in which to print errors.
FORMAT may be 'gcc' (the default) or 'msvs'
(Microsoft Visual Studio format).
--print_free_field_numbers Print the free field numbers of the messages
defined in the given proto files. Groups share
the same field number space with the parent
message. Extension ranges are counted as
occupied fields numbers.
--plugin=EXECUTABLE Specifies a plugin executable to use.
Normally, protoc searches the PATH for
plugins, but you may specify additional
executables not in the path using this flag.
Additionally, EXECUTABLE may be of the form
NAME=PATH, in which case the given plugin name
is mapped to the given executable even if
the executable's own name differs.
--cpp_out=OUT_DIR Generate C++ header and source.
--csharp_out=OUT_DIR Generate C# source file.
--java_out=OUT_DIR Generate Java source file.
--js_out=OUT_DIR Generate JavaScript source.
--objc_out=OUT_DIR Generate Objective C header and source.
--php_out=OUT_DIR Generate PHP source file.
--python_out=OUT_DIR Generate Python source file.
--ruby_out=OUT_DIR Generate Ruby source file.
@<filename> Read options and filenames from file. If a
relative file path is specified, the file
will be searched in the working directory.
The --proto_path option will not affect how
this argument file is searched. Content of
the file will be expanded in the position of
@<filename> as in the argument list. Note
that shell expansion is not applied to the
content of the file (i.e., you cannot use
quotes, wildcards, escapes, commands, etc.).
Each line corresponds to a single argument,
even if it contains spaces.
- warning
- このスクリプトの中では Goのコードを生成するための
protoc-gen-go
も同時にインストールしています - これがない状態でコードを生成しようとすると次のようなエラーになるので手動でインストールする場合は注意してください。
protoc-gen-go: program not found or is not executable
- このスクリプトの中では Goのコードを生成するための
- syntax
- Protocol Buffer のバージョン
- 現時点(2020-02)では
3
が最新。
- option
protoc
に与えるオプション- 現状 Golang 向けのオプションは
go_package
だけっぽいです。 -
- info
option go_package = "../ptypes";
のように指定して以下のように実行すれば../ptypes/
に pb ファイルが出力されます。-
~/helloworld/helloworld# protoc --go_out=plugins=grpc:. helloworld.proto ~/helloworld/helloworld# ls ../ptypes/ helloworld.pb.go
- ちなみに
--go_out=plugins=grpc:{パス}
を起点に出力されるためgo_package
にフルパスを指定するなら、パスには/
を指定します。
- 参考
- package
- パッケージ名
- (Golangでは)Goファイルのパッケージ名になるため出力先のディレクトリ名と合わせるべき
- import
- 別ファイルの定義を取り込む。
- message
- 通信に利用されるパラメータやレスポンスの型定義
- (Golangでは)Messageに対応する構造体が作られる
- Message の各フィールドは構造体のフィールドに対応する
- フィールドは型を持つ
- 配列(Array)を表現する場合は
repeated
を使う - マップを表現する場合は
map
を使う
- 配列(Array)を表現する場合は
- https://developers.google.com/protocol-buffers/docs/proto3
- service
- 機能をまとめた単位。サービスには RPC が紐づく。
- (Golangでは)定義したServiceの数だけ Server, Client のインタフェースが作られる
- Serviceの rpc フィールドはそれぞれのインタフェースのメソッドとして登録される
- 参考
- https://qiita.com/yugui/items/160737021d25d761b353
⭐️ RPC types
gRPCは4種類の通信方法を提供します。
というと身構えてしまいそうですが、 簡単に言うと「 stream
を指定するかしないか」
「クライアント側で指定するかサーバ側で指定するか」で名称が分かれているだけです。
- 種類
- stream指定
- 例
- Unary RPC
-
sequenceDiagram participant client participant server client->>server: Request server-->>client: Response
- Server streaming RPC
- サーバー
-
sequenceDiagram participant client participant server client->>server: Request server-->>client: Response server-->>client: Response
- Client streaming RPC
- クライアント
-
sequenceDiagram participant client participant server client->>server: Request client->>server: Request server-->>client: Response
- Bidirectional streaming
- 両方
-
sequenceDiagram participant client participant server client->>server: Request server-->>client: Response server-->>client: Response client->>server: Request client->>server: Request client->>server: Request
ではストリーミングとは何かというと、 連続した通信(リクエスト,レスポンス)で、これまでの単発の通信と対比する呼称です。
gRPCはリクエスト、レスポンスともにデータサイズに上限があります。やり取りするデータが可変長で巨大になる可能性がある場合はストリーミングによって通信を分割することが望ましいです。
🌚 Unary RPC
1つのリクエストに対して1つのレスポンスを返す最も基本的なRPCです。
クライアント、サーバともに stream
を指定しなければデフォルトでこれになります。
先程も見ましたが、定義ファイルをもう一度確認しておきましょう。
-
helloworld.proto
syntax = "proto3"; option java_multiple_files = true; option java_package = "io.grpc.examples.helloworld"; option java_outer_classname = "HelloWorldProto"; package helloworld; // The greeting service definition. service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
-
helloworld.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT. // source: helloworld.proto package helloworld import ( context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" math "math" ) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package // The request message containing the user's name. type HelloRequest struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *HelloRequest) Reset() { *m = HelloRequest{} } func (m *HelloRequest) String() string { return proto.CompactTextString(m) } func (*HelloRequest) ProtoMessage() {} func (*HelloRequest) Descriptor() ([]byte, []int) { return fileDescriptor_17b8c58d586b62f2, []int{0} } func (m *HelloRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_HelloRequest.Unmarshal(m, b) } func (m *HelloRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_HelloRequest.Marshal(b, m, deterministic) } func (m *HelloRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_HelloRequest.Merge(m, src) } func (m *HelloRequest) XXX_Size() int { return xxx_messageInfo_HelloRequest.Size(m) } func (m *HelloRequest) XXX_DiscardUnknown() { xxx_messageInfo_HelloRequest.DiscardUnknown(m) } var xxx_messageInfo_HelloRequest proto.InternalMessageInfo func (m *HelloRequest) GetName() string { if m != nil { return m.Name } return "" } // The response message containing the greetings type HelloReply struct { Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *HelloReply) Reset() { *m = HelloReply{} } func (m *HelloReply) String() string { return proto.CompactTextString(m) } func (*HelloReply) ProtoMessage() {} func (*HelloReply) Descriptor() ([]byte, []int) { return fileDescriptor_17b8c58d586b62f2, []int{1} } func (m *HelloReply) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_HelloReply.Unmarshal(m, b) } func (m *HelloReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_HelloReply.Marshal(b, m, deterministic) } func (m *HelloReply) XXX_Merge(src proto.Message) { xxx_messageInfo_HelloReply.Merge(m, src) } func (m *HelloReply) XXX_Size() int { return xxx_messageInfo_HelloReply.Size(m) } func (m *HelloReply) XXX_DiscardUnknown() { xxx_messageInfo_HelloReply.DiscardUnknown(m) } var xxx_messageInfo_HelloReply proto.InternalMessageInfo func (m *HelloReply) GetMessage() string { if m != nil { return m.Message } return "" } func init() { proto.RegisterType((*HelloRequest)(nil), "helloworld.HelloRequest") proto.RegisterType((*HelloReply)(nil), "helloworld.HelloReply") } func init() { proto.RegisterFile("helloworld.proto", fileDescriptor_17b8c58d586b62f2) } var fileDescriptor_17b8c58d586b62f2 = []byte{ // 175 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0xcd, 0xc9, 0xc9, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88, 0x28, 0x29, 0x71, 0xf1, 0x78, 0x80, 0x78, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x42, 0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0xb6, 0x92, 0x1a, 0x17, 0x17, 0x54, 0x4d, 0x41, 0x4e, 0xa5, 0x90, 0x04, 0x17, 0x7b, 0x6e, 0x6a, 0x71, 0x71, 0x62, 0x3a, 0x4c, 0x11, 0x8c, 0x6b, 0xe4, 0xc9, 0xc5, 0xee, 0x5e, 0x94, 0x9a, 0x5a, 0x92, 0x5a, 0x24, 0x64, 0xc7, 0xc5, 0x11, 0x9c, 0x58, 0x09, 0xd6, 0x25, 0x24, 0xa1, 0x87, 0xe4, 0x02, 0x64, 0xcb, 0xa4, 0xc4, 0xb0, 0xc8, 0x14, 0xe4, 0x54, 0x2a, 0x31, 0x38, 0x19, 0x70, 0x49, 0x67, 0xe6, 0xeb, 0xa5, 0x17, 0x15, 0x24, 0xeb, 0xa5, 0x56, 0x24, 0xe6, 0x16, 0xe4, 0xa4, 0x16, 0x23, 0xa9, 0x75, 0xe2, 0x07, 0x2b, 0x0e, 0x07, 0xb1, 0x03, 0x40, 0x5e, 0x0a, 0x60, 0x4c, 0x62, 0x03, 0xfb, 0xcd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x0f, 0xb7, 0xcd, 0xf2, 0xef, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConnInterface // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 // GreeterClient is the client API for Greeter service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type GreeterClient interface { // Sends a greeting SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) } type greeterClient struct { cc grpc.ClientConnInterface } func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient { return &greeterClient{cc} } func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { out := new(HelloReply) err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...) if err != nil { return nil, err } return out, nil } // GreeterServer is the server API for Greeter service. type GreeterServer interface { // Sends a greeting SayHello(context.Context, *HelloRequest) (*HelloReply, error) } // UnimplementedGreeterServer can be embedded to have forward compatible implementations. type UnimplementedGreeterServer struct { } func (*UnimplementedGreeterServer) SayHello(ctx context.Context, req *HelloRequest) (*HelloReply, error) { return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented") } func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv) } func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(HelloRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(GreeterServer).SayHello(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/helloworld.Greeter/SayHello", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) } return interceptor(ctx, in, info, handler) } var _Greeter_serviceDesc = grpc.ServiceDesc{ ServiceName: "helloworld.Greeter", HandlerType: (*GreeterServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "SayHello", Handler: _Greeter_SayHello_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "helloworld.proto", }
これらを使ってサーバとクライアントのコードを書いていきます。
-
greeter_server/main.go
package main import ( "context" "log" "net" pb "github.com/righ/grpc-go-example/helloworld/helloworld" "google.golang.org/grpc" ) const ( port = ":50051" ) // server is used to implement helloworld.GreeterServer. type server struct { pb.UnimplementedGreeterServer } // SayHello implements helloworld.GreeterServer func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.GetName()) return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
-
greeter_client/main.go
package main import ( "context" "log" "os" "time" pb "github.com/righ/grpc-go-example/helloworld/helloworld" "google.golang.org/grpc" ) const ( address = "grpc-server:50051" defaultName = "world" ) func main() { // Set up a connection to the server. conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) // Contact the server and print out its response. name := defaultName if len(os.Args) > 1 { name = os.Args[1] } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name}) if err != nil { log.Fatalf("could not greet: %v", err) } log.Printf("Greeting: %s", r.GetMessage()) }
- これは以降全てに共通することですが、サーバ側のコードでは
pb.Register{Service名}Server
関数を使って「grpcのサーバ」と「サービス」を紐付けるわけですが、 サービスには上述したインタフェースを引数に取るため、構造体には rpc に相当するメソッドが定義されている必要があります。 Unary RPC では サーバ側のRPCメソッド はリクエストのパラメータとして構造体を受け取って レスポンスとして構造体を返却するだけです。 - Unary RPC のクライアント側コードは更にシンプルで
pb.New{Service名}Client
で作成したクライアントから呼び出したい rpc のメソッドを呼び出すだけです。 サーバと違い、レスポンスを返却値として返すだけなのでメソッドを実装する必要はありません。
これは Unary RPC に限った話ではないですが、サーバ側のインタフェースを満たす空の構造体(Unimplemented{サービス名}Server
)が自動的に生成されるので、
それを使えばひとまずサーバとして動くようになります。
今回のexampleのように、これ(UnimplementedServer)を埋め込んだ構造体を自分で定義して、 必要なメソッドを追加していくというのも良さそうです。
type server struct {
pb.UnimplementedGreeterServer
}
実装すべき機能が多い場合は便利ですね。
- warning
Unary RPC
は最も基本的な gRPC ではありますが、後述する RPC と引数が異なるので注意してください。
利用例は最初のセクションを参照してください。
🌜 Server streaming RPC
1つのリクエストに対して、複数(N)のレスポンスを返します。
まずはproto定義と自動生成ファイル。 レスポンスにだけ stream が指定されています。
-
goodnightworld.proto
syntax = "proto3"; package goodnightworld; // The greeting service definition. service Greeter { // Sends a greeting rpc SayGoodnight (GoodnightRequest) returns (stream GoodnightReply) {} } // The request message containing the user's name. message GoodnightRequest { string name = 1; } // The response message containing the greetings message GoodnightReply { string message = 1; }
-
goodnightworld.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT. // source: goodnightworld.proto package goodnightworld import ( context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" math "math" ) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package // The request message containing the user's name. type GoodnightRequest struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *GoodnightRequest) Reset() { *m = GoodnightRequest{} } func (m *GoodnightRequest) String() string { return proto.CompactTextString(m) } func (*GoodnightRequest) ProtoMessage() {} func (*GoodnightRequest) Descriptor() ([]byte, []int) { return fileDescriptor_a27943471a8d8b0a, []int{0} } func (m *GoodnightRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_GoodnightRequest.Unmarshal(m, b) } func (m *GoodnightRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_GoodnightRequest.Marshal(b, m, deterministic) } func (m *GoodnightRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_GoodnightRequest.Merge(m, src) } func (m *GoodnightRequest) XXX_Size() int { return xxx_messageInfo_GoodnightRequest.Size(m) } func (m *GoodnightRequest) XXX_DiscardUnknown() { xxx_messageInfo_GoodnightRequest.DiscardUnknown(m) } var xxx_messageInfo_GoodnightRequest proto.InternalMessageInfo func (m *GoodnightRequest) GetName() string { if m != nil { return m.Name } return "" } // The response message containing the greetings type GoodnightReply struct { Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *GoodnightReply) Reset() { *m = GoodnightReply{} } func (m *GoodnightReply) String() string { return proto.CompactTextString(m) } func (*GoodnightReply) ProtoMessage() {} func (*GoodnightReply) Descriptor() ([]byte, []int) { return fileDescriptor_a27943471a8d8b0a, []int{1} } func (m *GoodnightReply) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_GoodnightReply.Unmarshal(m, b) } func (m *GoodnightReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_GoodnightReply.Marshal(b, m, deterministic) } func (m *GoodnightReply) XXX_Merge(src proto.Message) { xxx_messageInfo_GoodnightReply.Merge(m, src) } func (m *GoodnightReply) XXX_Size() int { return xxx_messageInfo_GoodnightReply.Size(m) } func (m *GoodnightReply) XXX_DiscardUnknown() { xxx_messageInfo_GoodnightReply.DiscardUnknown(m) } var xxx_messageInfo_GoodnightReply proto.InternalMessageInfo func (m *GoodnightReply) GetMessage() string { if m != nil { return m.Message } return "" } func init() { proto.RegisterType((*GoodnightRequest)(nil), "goodnightworld.GoodnightRequest") proto.RegisterType((*GoodnightReply)(nil), "goodnightworld.GoodnightReply") } func init() { proto.RegisterFile("goodnightworld.proto", fileDescriptor_a27943471a8d8b0a) } var fileDescriptor_a27943471a8d8b0a = []byte{ // 148 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x49, 0xcf, 0xcf, 0x4f, 0xc9, 0xcb, 0x4c, 0xcf, 0x28, 0x29, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x43, 0x15, 0x55, 0x52, 0xe3, 0x12, 0x70, 0x87, 0x89, 0x04, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, 0x08, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x81, 0xd9, 0x4a, 0x5a, 0x5c, 0x7c, 0x48, 0xea, 0x0a, 0x72, 0x2a, 0x85, 0x24, 0xb8, 0xd8, 0x73, 0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x61, 0x0a, 0x61, 0x5c, 0xa3, 0x78, 0x2e, 0x76, 0xf7, 0xa2, 0xd4, 0xd4, 0x92, 0xd4, 0x22, 0xa1, 0x10, 0x2e, 0x9e, 0xe0, 0xc4, 0x4a, 0xb8, 0x4e, 0x21, 0x05, 0x3d, 0x34, 0x57, 0xa1, 0x5b, 0x2e, 0x25, 0x87, 0x47, 0x45, 0x41, 0x4e, 0xa5, 0x12, 0x83, 0x01, 0x63, 0x12, 0x1b, 0xd8, 0x2f, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0xae, 0x2b, 0xc8, 0xe3, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConnInterface // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 // GreeterClient is the client API for Greeter service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type GreeterClient interface { // Sends a greeting SayGoodnight(ctx context.Context, in *GoodnightRequest, opts ...grpc.CallOption) (Greeter_SayGoodnightClient, error) } type greeterClient struct { cc grpc.ClientConnInterface } func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient { return &greeterClient{cc} } func (c *greeterClient) SayGoodnight(ctx context.Context, in *GoodnightRequest, opts ...grpc.CallOption) (Greeter_SayGoodnightClient, error) { stream, err := c.cc.NewStream(ctx, &_Greeter_serviceDesc.Streams[0], "/goodnightworld.Greeter/SayGoodnight", opts...) if err != nil { return nil, err } x := &greeterSayGoodnightClient{stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } if err := x.ClientStream.CloseSend(); err != nil { return nil, err } return x, nil } type Greeter_SayGoodnightClient interface { Recv() (*GoodnightReply, error) grpc.ClientStream } type greeterSayGoodnightClient struct { grpc.ClientStream } func (x *greeterSayGoodnightClient) Recv() (*GoodnightReply, error) { m := new(GoodnightReply) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } // GreeterServer is the server API for Greeter service. type GreeterServer interface { // Sends a greeting SayGoodnight(*GoodnightRequest, Greeter_SayGoodnightServer) error } // UnimplementedGreeterServer can be embedded to have forward compatible implementations. type UnimplementedGreeterServer struct { } func (*UnimplementedGreeterServer) SayGoodnight(req *GoodnightRequest, srv Greeter_SayGoodnightServer) error { return status.Errorf(codes.Unimplemented, "method SayGoodnight not implemented") } func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv) } func _Greeter_SayGoodnight_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(GoodnightRequest) if err := stream.RecvMsg(m); err != nil { return err } return srv.(GreeterServer).SayGoodnight(m, &greeterSayGoodnightServer{stream}) } type Greeter_SayGoodnightServer interface { Send(*GoodnightReply) error grpc.ServerStream } type greeterSayGoodnightServer struct { grpc.ServerStream } func (x *greeterSayGoodnightServer) Send(m *GoodnightReply) error { return x.ServerStream.SendMsg(m) } var _Greeter_serviceDesc = grpc.ServiceDesc{ ServiceName: "goodnightworld.Greeter", HandlerType: (*GreeterServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "SayGoodnight", Handler: _Greeter_SayGoodnight_Handler, ServerStreams: true, }, }, Metadata: "goodnightworld.proto", }
今回プログラムの概要は以下です。
- クライアントは名前をサーバに一度だけ送信する
- サーバは受け取った名前を使って挨拶を組み立てて3回連続で返却する
- デバッグで標準出力にも表示する
- クライアントはサーバから受け取ったメッセージを標準出力に表示する
実用性は全くありません。
-
greeter_server/main.go
package main import ( "log" "net" pb "github.com/righ/grpc-go-example/goodnightworld/goodnightworld" "google.golang.org/grpc" ) const ( port = ":50051" ) // server is used to implement goodnightworld.GreeterServer. type server struct { pb.UnimplementedGreeterServer } // SayGoodnight implements goodnightworld.GreeterServer func (s *server) SayGoodnight(in *pb.GoodnightRequest, srv pb.Greeter_SayGoodnightServer) error { log.Printf("Received: %s", in.GetName()) srv.Send(&pb.GoodnightReply{Message: "Good night " + in.GetName() + "!"}) srv.Send(&pb.GoodnightReply{Message: "Good night " + in.GetName() + "!"}) srv.Send(&pb.GoodnightReply{Message: "Good night " + in.GetName() + "!"}) return nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
-
greeter_client/main.go
package main import ( "context" "io" "log" "time" pb "github.com/righ/grpc-go-example/goodnightworld/goodnightworld" "google.golang.org/grpc" ) const ( address = "grpc-server:50051" ) func main() { conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() cli, err := c.SayGoodnight(ctx, &pb.GoodnightRequest{Name: "righ"}) if err != nil { log.Fatalf("could not greet: %v", err) return } for { reply, err := cli.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("could not greet: %v", err) } log.Println(reply.Message) } }
- Server streaming RPC の サーバ側のRPCメソッド は Unary RPC とは少し違い、リクエストパラメータは第1引数で受け取ります。 レスポンスは第2引数のServer構造体(以下 srv という)の srv.Send メソッドを使って複数返却できます。 ctx はどこいったの?となりますが srv.Context に入っています。
- Server streaming PRC の クライアント側のRPCメソッドも Unary RPC とは違います。 具体的にはパラメータは引数として受け取らず、メソッドの返却値としてレスポンスではなくRPCのクライアントが返却されます。(以下 cli という) cli.Send メソッドを使いリクエストを送信し(パラメータはここで指定する)、 cli.Recv メソッドを使って res, err := cli.Recv() のようにレスポンスを抽出します。 レスポンスは複数受け取ることになるためループの中で受け取ることになりますが、何らかの形で終了を検知しなければなりません。 これは err が io.EOF と一致するかどうかで判定できるので検知したらループを抜けます。
~/goodnightworld/greeter_server# go run .
~/goodnightworld/greeter_client# go run .
-
2020/02/01 08:13:23 Received: name:"righ"
-
2020/02/01 08:13:23 Good night righ! 2020/02/01 08:13:23 Good night righ! 2020/02/01 08:13:23 Good night righ!
sequenceDiagram
participant client
participant server
client->>server: righ
server-->>client: Good night righ!
server-->>client: Good night righ!
server-->>client: Good night righ!
🌛 Client streaming RPC
複数(M)のリクエストに対して、1つのレスポンスを返します。
まずはproto定義と自動生成ファイル。 リクエストにだけ
stream
が指定されています。
-
goodmorningworld.proto
syntax = "proto3"; package goodmorningworld; // The greeting service definition. service Greeter { // Sends a greeting rpc SayGoodmorning (stream GoodmorningRequest) returns (GoodmorningReply) {} } // The request message containing the user's name. message GoodmorningRequest { string name = 1; } // The response message containing the greetings message GoodmorningReply { string message = 1; }
-
goodmorningworld.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT. // source: goodmorningworld.proto package goodmorningworld import ( context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" math "math" ) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package // The request message containing the user's name. type GoodmorningRequest struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *GoodmorningRequest) Reset() { *m = GoodmorningRequest{} } func (m *GoodmorningRequest) String() string { return proto.CompactTextString(m) } func (*GoodmorningRequest) ProtoMessage() {} func (*GoodmorningRequest) Descriptor() ([]byte, []int) { return fileDescriptor_d4e0dee62205cbf6, []int{0} } func (m *GoodmorningRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_GoodmorningRequest.Unmarshal(m, b) } func (m *GoodmorningRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_GoodmorningRequest.Marshal(b, m, deterministic) } func (m *GoodmorningRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_GoodmorningRequest.Merge(m, src) } func (m *GoodmorningRequest) XXX_Size() int { return xxx_messageInfo_GoodmorningRequest.Size(m) } func (m *GoodmorningRequest) XXX_DiscardUnknown() { xxx_messageInfo_GoodmorningRequest.DiscardUnknown(m) } var xxx_messageInfo_GoodmorningRequest proto.InternalMessageInfo func (m *GoodmorningRequest) GetName() string { if m != nil { return m.Name } return "" } // The response message containing the greetings type GoodmorningReply struct { Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *GoodmorningReply) Reset() { *m = GoodmorningReply{} } func (m *GoodmorningReply) String() string { return proto.CompactTextString(m) } func (*GoodmorningReply) ProtoMessage() {} func (*GoodmorningReply) Descriptor() ([]byte, []int) { return fileDescriptor_d4e0dee62205cbf6, []int{1} } func (m *GoodmorningReply) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_GoodmorningReply.Unmarshal(m, b) } func (m *GoodmorningReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_GoodmorningReply.Marshal(b, m, deterministic) } func (m *GoodmorningReply) XXX_Merge(src proto.Message) { xxx_messageInfo_GoodmorningReply.Merge(m, src) } func (m *GoodmorningReply) XXX_Size() int { return xxx_messageInfo_GoodmorningReply.Size(m) } func (m *GoodmorningReply) XXX_DiscardUnknown() { xxx_messageInfo_GoodmorningReply.DiscardUnknown(m) } var xxx_messageInfo_GoodmorningReply proto.InternalMessageInfo func (m *GoodmorningReply) GetMessage() string { if m != nil { return m.Message } return "" } func init() { proto.RegisterType((*GoodmorningRequest)(nil), "goodmorningworld.GoodmorningRequest") proto.RegisterType((*GoodmorningReply)(nil), "goodmorningworld.GoodmorningReply") } func init() { proto.RegisterFile("goodmorningworld.proto", fileDescriptor_d4e0dee62205cbf6) } var fileDescriptor_d4e0dee62205cbf6 = []byte{ // 150 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4b, 0xcf, 0xcf, 0x4f, 0xc9, 0xcd, 0x2f, 0xca, 0xcb, 0xcc, 0x4b, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x40, 0x17, 0x57, 0xd2, 0xe0, 0x12, 0x72, 0x47, 0x88, 0x05, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, 0x08, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x81, 0xd9, 0x4a, 0x3a, 0x5c, 0x02, 0x28, 0x2a, 0x0b, 0x72, 0x2a, 0x85, 0x24, 0xb8, 0xd8, 0x73, 0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x61, 0x4a, 0x61, 0x5c, 0xa3, 0x4c, 0x2e, 0x76, 0xf7, 0xa2, 0xd4, 0xd4, 0x92, 0xd4, 0x22, 0xa1, 0x38, 0x2e, 0xbe, 0xe0, 0xc4, 0x4a, 0x24, 0xbd, 0x42, 0x2a, 0x7a, 0x18, 0xee, 0xc3, 0x74, 0x84, 0x94, 0x12, 0x01, 0x55, 0x05, 0x39, 0x95, 0x4a, 0x0c, 0x1a, 0x8c, 0x49, 0x6c, 0x60, 0xbf, 0x19, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x89, 0x33, 0x30, 0x81, 0xf5, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConnInterface // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 // GreeterClient is the client API for Greeter service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type GreeterClient interface { // Sends a greeting SayGoodmorning(ctx context.Context, opts ...grpc.CallOption) (Greeter_SayGoodmorningClient, error) } type greeterClient struct { cc grpc.ClientConnInterface } func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient { return &greeterClient{cc} } func (c *greeterClient) SayGoodmorning(ctx context.Context, opts ...grpc.CallOption) (Greeter_SayGoodmorningClient, error) { stream, err := c.cc.NewStream(ctx, &_Greeter_serviceDesc.Streams[0], "/goodmorningworld.Greeter/SayGoodmorning", opts...) if err != nil { return nil, err } x := &greeterSayGoodmorningClient{stream} return x, nil } type Greeter_SayGoodmorningClient interface { Send(*GoodmorningRequest) error CloseAndRecv() (*GoodmorningReply, error) grpc.ClientStream } type greeterSayGoodmorningClient struct { grpc.ClientStream } func (x *greeterSayGoodmorningClient) Send(m *GoodmorningRequest) error { return x.ClientStream.SendMsg(m) } func (x *greeterSayGoodmorningClient) CloseAndRecv() (*GoodmorningReply, error) { if err := x.ClientStream.CloseSend(); err != nil { return nil, err } m := new(GoodmorningReply) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } // GreeterServer is the server API for Greeter service. type GreeterServer interface { // Sends a greeting SayGoodmorning(Greeter_SayGoodmorningServer) error } // UnimplementedGreeterServer can be embedded to have forward compatible implementations. type UnimplementedGreeterServer struct { } func (*UnimplementedGreeterServer) SayGoodmorning(srv Greeter_SayGoodmorningServer) error { return status.Errorf(codes.Unimplemented, "method SayGoodmorning not implemented") } func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv) } func _Greeter_SayGoodmorning_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(GreeterServer).SayGoodmorning(&greeterSayGoodmorningServer{stream}) } type Greeter_SayGoodmorningServer interface { SendAndClose(*GoodmorningReply) error Recv() (*GoodmorningRequest, error) grpc.ServerStream } type greeterSayGoodmorningServer struct { grpc.ServerStream } func (x *greeterSayGoodmorningServer) SendAndClose(m *GoodmorningReply) error { return x.ServerStream.SendMsg(m) } func (x *greeterSayGoodmorningServer) Recv() (*GoodmorningRequest, error) { m := new(GoodmorningRequest) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } return m, nil } var _Greeter_serviceDesc = grpc.ServiceDesc{ ServiceName: "goodmorningworld.Greeter", HandlerType: (*GreeterServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "SayGoodmorning", Handler: _Greeter_SayGoodmorning_Handler, ClientStreams: true, }, }, Metadata: "goodmorningworld.proto", }
今回作成するプログラムの概要は以下です。
- クライアントは標準入力から名前を受け取り、一つずつサーバに送信する
- 空行が入力されるとクライアント側の入力は完了とする
- サーバはクライアントから受け取った名前を使い挨拶を組み立ててクライアントに返却する
- 受け取った名前をデバッグ用に標準出力に表示する
- クライアントはサーバから受け取った挨拶を標準出力に表示する
実用性はない。
-
greeter_server/main.go
package main import ( "io" "log" "net" "strings" pb "github.com/righ/grpc-go-example/goodmorningworld/goodmorningworld" "google.golang.org/grpc" ) const ( port = ":50051" ) // server is used to implement goodmorningworld.GreeterServer. type server struct { pb.UnimplementedGreeterServer } // SayGoodmorning implements goodmorningworld.GreeterServer func (s *server) SayGoodmorning(srv pb.Greeter_SayGoodmorningServer) error { names := []string{} for { req, err := srv.Recv() if err == io.EOF { break } if err != nil { return err } log.Printf("Received: %s", req) names = append(names, req.GetName()) } message := strings.Join(names[:], ",") srv.SendAndClose(&pb.GoodmorningReply{Message: "Good morning " + message + "!"}) return nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
-
greeter_client/main.go
package main import ( "bufio" "context" "fmt" "log" "os" "time" pb "github.com/righ/grpc-go-example/goodmorningworld/goodmorningworld" "google.golang.org/grpc" ) const ( address = "grpc-server:50051" ) func main() { conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() cli, err := c.SayGoodmorning(ctx) if err != nil { log.Fatalf("could not greet: %v", err) return } scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { name := scanner.Text() if name == "" { break } if err := cli.Send(&pb.GoodmorningRequest{Name: name}); err != nil { log.Fatalf("Send failed: %v", err) } } reply, err := cli.CloseAndRecv() if err != nil { log.Fatalf("could not greet: %v", err) return } fmt.Println(reply.GetMessage()) }
- Client streaming RPC のサーバ側RPCメソッドは Server構造体(以下 srv という)を引数として、 srv.Recv メソッドを使い req, err := srv.Recv() のようにリクエストを抽出するように実装します。 リクエストは複数受け取ることになるため、先程のクライアントサイドと同様に err が io.EOF のときにループを抜けるようにします。 レスポンスは srv.SendAndClose を使って返却します。
- Client streaming PRC の クライアント側RPCメソッドはRPCのクライアントが返却されます。(以下 cli という)
cli.Send メソッドを使いリクエストを送信し(パラメータはここで指定する)、
cli.CloseAndRecv メソッドを使って
res, err := cli.CloseAndRecv()
のようにサーバからのレスポンスを抽出し、通信を終了します。
~/goodmorningworld/greeter_server# go run .
~/goodmorningworld/greeter_client# go run .
-
2020/02/01 08:50:44 Received: name:"dj" 2020/02/01 08:50:44 Received: name:"steph" 2020/02/01 08:50:44 Received: name:"michelle"
-
root@98337ba2f5a0:~/goodmorningworld/greeter_client# go run . dj steph michelle Good morning dj,steph,michelle!
sequenceDiagram
participant client
participant server
client->>server: dj
client->>server: setph
client->>server: michelle
server-->>client: Good morning dj,steph,michelle!
🌝 Bidirectional streaming RPC
複数(N)のリクエストに対して、複数(M)のレスポンスを返すことで双方向の通信を実現します。
Bidirectional streaming RPC - gRPC
まずはproto定義と自動生成ファイル。
RPCのリクエスト・レスポンスともに stream
を指定しました。
-
chat.proto
syntax = "proto3"; package chat; service Chat { rpc Talk (stream MessageRequest) returns (stream MessageReply) {} } message MessageRequest { string name = 1; string message = 2; } message MessageReply { string name = 1; string message = 2; }
-
chat.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT. // source: chat.proto package chat import ( context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" math "math" ) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type MessageRequest struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *MessageRequest) Reset() { *m = MessageRequest{} } func (m *MessageRequest) String() string { return proto.CompactTextString(m) } func (*MessageRequest) ProtoMessage() {} func (*MessageRequest) Descriptor() ([]byte, []int) { return fileDescriptor_8c585a45e2093e54, []int{0} } func (m *MessageRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MessageRequest.Unmarshal(m, b) } func (m *MessageRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_MessageRequest.Marshal(b, m, deterministic) } func (m *MessageRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_MessageRequest.Merge(m, src) } func (m *MessageRequest) XXX_Size() int { return xxx_messageInfo_MessageRequest.Size(m) } func (m *MessageRequest) XXX_DiscardUnknown() { xxx_messageInfo_MessageRequest.DiscardUnknown(m) } var xxx_messageInfo_MessageRequest proto.InternalMessageInfo func (m *MessageRequest) GetName() string { if m != nil { return m.Name } return "" } func (m *MessageRequest) GetMessage() string { if m != nil { return m.Message } return "" } type MessageReply struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *MessageReply) Reset() { *m = MessageReply{} } func (m *MessageReply) String() string { return proto.CompactTextString(m) } func (*MessageReply) ProtoMessage() {} func (*MessageReply) Descriptor() ([]byte, []int) { return fileDescriptor_8c585a45e2093e54, []int{1} } func (m *MessageReply) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MessageReply.Unmarshal(m, b) } func (m *MessageReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_MessageReply.Marshal(b, m, deterministic) } func (m *MessageReply) XXX_Merge(src proto.Message) { xxx_messageInfo_MessageReply.Merge(m, src) } func (m *MessageReply) XXX_Size() int { return xxx_messageInfo_MessageReply.Size(m) } func (m *MessageReply) XXX_DiscardUnknown() { xxx_messageInfo_MessageReply.DiscardUnknown(m) } var xxx_messageInfo_MessageReply proto.InternalMessageInfo func (m *MessageReply) GetName() string { if m != nil { return m.Name } return "" } func (m *MessageReply) GetMessage() string { if m != nil { return m.Message } return "" } func init() { proto.RegisterType((*MessageRequest)(nil), "chat.MessageRequest") proto.RegisterType((*MessageReply)(nil), "chat.MessageReply") } func init() { proto.RegisterFile("chat.proto", fileDescriptor_8c585a45e2093e54) } var fileDescriptor_8c585a45e2093e54 = []byte{ // 140 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4a, 0xce, 0x48, 0x2c, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x01, 0xb1, 0x95, 0xec, 0xb8, 0xf8, 0x7c, 0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x53, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x84, 0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0xc0, 0x6c, 0x21, 0x09, 0x2e, 0xf6, 0x5c, 0x88, 0x2a, 0x09, 0x26, 0xb0, 0x30, 0x8c, 0xab, 0x64, 0xc3, 0xc5, 0x03, 0xd7, 0x5f, 0x90, 0x53, 0x49, 0x9a, 0x6e, 0x23, 0x3b, 0x2e, 0x16, 0xe7, 0x8c, 0xc4, 0x12, 0x21, 0x33, 0x2e, 0x96, 0x90, 0xc4, 0x9c, 0x6c, 0x21, 0x11, 0x3d, 0xb0, 0x03, 0x51, 0x5d, 0x24, 0x25, 0x84, 0x26, 0x5a, 0x90, 0x53, 0xa9, 0xc4, 0xa0, 0xc1, 0x68, 0xc0, 0x98, 0xc4, 0x06, 0xf6, 0x8a, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x23, 0x97, 0x5d, 0x87, 0xd8, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConnInterface // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 // ChatClient is the client API for Chat service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type ChatClient interface { Talk(ctx context.Context, opts ...grpc.CallOption) (Chat_TalkClient, error) } type chatClient struct { cc grpc.ClientConnInterface } func NewChatClient(cc grpc.ClientConnInterface) ChatClient { return &chatClient{cc} } func (c *chatClient) Talk(ctx context.Context, opts ...grpc.CallOption) (Chat_TalkClient, error) { stream, err := c.cc.NewStream(ctx, &_Chat_serviceDesc.Streams[0], "/chat.Chat/Talk", opts...) if err != nil { return nil, err } x := &chatTalkClient{stream} return x, nil } type Chat_TalkClient interface { Send(*MessageRequest) error Recv() (*MessageReply, error) grpc.ClientStream } type chatTalkClient struct { grpc.ClientStream } func (x *chatTalkClient) Send(m *MessageRequest) error { return x.ClientStream.SendMsg(m) } func (x *chatTalkClient) Recv() (*MessageReply, error) { m := new(MessageReply) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } // ChatServer is the server API for Chat service. type ChatServer interface { Talk(Chat_TalkServer) error } // UnimplementedChatServer can be embedded to have forward compatible implementations. type UnimplementedChatServer struct { } func (*UnimplementedChatServer) Talk(srv Chat_TalkServer) error { return status.Errorf(codes.Unimplemented, "method Talk not implemented") } func RegisterChatServer(s *grpc.Server, srv ChatServer) { s.RegisterService(&_Chat_serviceDesc, srv) } func _Chat_Talk_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(ChatServer).Talk(&chatTalkServer{stream}) } type Chat_TalkServer interface { Send(*MessageReply) error Recv() (*MessageRequest, error) grpc.ServerStream } type chatTalkServer struct { grpc.ServerStream } func (x *chatTalkServer) Send(m *MessageReply) error { return x.ServerStream.SendMsg(m) } func (x *chatTalkServer) Recv() (*MessageRequest, error) { m := new(MessageRequest) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } return m, nil } var _Chat_serviceDesc = grpc.ServiceDesc{ ServiceName: "chat.Chat", HandlerType: (*ChatServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "Talk", Handler: _Chat_Talk_Handler, ServerStreams: true, ClientStreams: true, }, }, Metadata: "chat.proto", }
双方向ということですが、私の想像力が乏しいせいで例題がチャットくらいしか思い浮かばなかったのでかなり不完全ではありますがとりあえず作ってみました。
今回作成するプログラムの概要は以下のようになっています。
- クライアントは
標準入力から受け取ったメッセージ
と自分の名前
をサーバに送信する- 自分の名前はクライアントプログラム開始時にオプションで指定する
- サーバはクライアントから受信した
名前
とメッセージ
を受信時刻
とともに保持する- デバッグ用に標準出力にも表示する
- サーバはクライアントからメッセージを受信するたびに、クライアントごとに保持している最終受信時刻より後に受け取ったメッセージをすべてクライアントに送信する
- クライアントはサーバから受け取ったメッセージを標準出力に表示する
続いてプログラム。これまでの中では一番複雑ですが、せいぜい60行くらいです。
-
chat_server/main.go
package main import ( "log" "net" "time" pb "github.com/righ/grpc-go-example/chat/chat" "google.golang.org/grpc" ) const ( port = ":50051" ) type message struct { name string text string time time.Time } var messages = []message{} type server struct { pb.UnimplementedChatServer } func (s *server) Talk(srv pb.Chat_TalkServer) error { lastRead := time.Now() for { req, err := srv.Recv() if err != nil { return err } name, msg := req.GetName(), req.GetMessage() log.Printf("%s>: %s", name, msg) for _, m := range messages { if m.time.After(lastRead) { srv.Send(&pb.MessageReply{Name: m.name, Message: m.text}) } } lastRead = time.Now() messages = append(messages, message{name, msg, lastRead}) } } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterChatServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
-
chat_client/main.go
package main import ( "bufio" "context" "flag" "io" "log" "os" pb "github.com/righ/grpc-go-example/chat/chat" "google.golang.org/grpc" ) const ( address = "grpc-server:50051" ) func main() { var ( name = flag.String("name", "noname", "user name") ) flag.Parse() conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewChatClient(conn) ctx := context.Background() cli, err := c.Talk(ctx) if err != nil { log.Fatalf("could not talk: %v", err) return } go func() { scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { message := scanner.Text() if message == "" { break } if err := cli.Send(&pb.MessageRequest{Name: *name, Message: message}); err != nil { log.Fatalf("Send failed: %v", err) } } }() for { reply, err := cli.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("could not chat: %v", err) } log.Printf("%s> %s", reply.Name, reply.Message) } }
- Bidirectional streaming RPC のサーバ側のRPCメソッドは Server構造体(以下 srv という)を受け取るように実装します。
リクエストを複数受け取るためループ内で srv.Recv メソッドを使い
req, err := srv.Recv()
のようにリクエストを抽出します。 レスポンスは srv.Send メソッドを使って複数返却できます。1個でもいいし、返却しなくても良いでしょう。 - Bidirectional streaming PRC の クライアント側RPCメソッドはRPCのクライアントを返却します。(以下 cli という)
リクエストは複数送信できるためループ内で cli.Send メソッドを使い送信し(パラメータはここで指定する)、
レスポンスも複数受信するためループ内で cli.Recv メソッドを使って
res, err := cli.Recv()
のように抽出します。 今回は 「送信(入力)」と「受信」の両方を待ち受ける必要があるため、「送信(入力)」の方をゴルーチンでバックグラウンド実行させています。
多少チャットらしくするため、今回はクライアントを2つ起動して残念な会話を繰り広げてみます
~/chat/chat_server# go run .
~/chat/chat_client# go run . -name Karen
~/chat/chat_client# go run . -name Yoko
-
2020/02/01 07:36:46 Karen>: はじめまして 2020/02/01 07:36:49 Karen>: かれんです 2020/02/01 07:36:54 Yoko>: こんにちは 2020/02/01 07:36:59 Yoko>: 用高です 2020/02/01 07:37:41 Karen>: なんて呼んだらいいですか? 2020/02/01 07:37:52 Yoko>: かれんさん、いい名前ですね!
-
はじめまして かれんです なんて呼んだらいいですか? 2020/02/01 07:37:41 Yoko> こんにちは 2020/02/01 07:37:41 Yoko> 用高です
-
こんにちは 2020/02/01 07:36:54 Karen> はじめまして 2020/02/01 07:36:54 Karen> かれんです 用高です かれんさん、いい名前ですね! 2020/02/01 07:37:52 Karen> なんて呼んだらいいですか?
(会話はここで途切れている
sequenceDiagram
participant client1
participant server
participant client2
client1->>server: はじめまして
client1->>server: かれんです
client2->>server: こんにちわ
client2->>server: 用高です
client1->>server: なんて呼んだらいいですか?
server->>client1: こんにちわ
server->>client1: 用高です
client2->>server: かれんさん、いい名前ですね!
server->>client2: なんて呼んだらいいですか?
お気付きの通り、双方向通信ではあるもののリアルタイム通信ではないので使い勝手はお世辞にも良いとは言えません。
本来はメッセージが届いた時点でつながっている全クライアントにブロードキャストできたらよかったんですが、 gRPCは現状で 任意 のクライアントに対してデータを送信する機能を提供していません。
サーバはクライアントからRPC接続を受けるたびにゴルーチンを作り、該当するRPCメソッドはバックグラウンド実行されます。 ここで作られたゴルーチンは接続してきたクライアントしか知らないので、現実装のRPCでは「受信(srv.Recv)」が唯一のメッセージ同期のトリガーとなってしまっているわけです。
🌞 改良する
(このセクションはgRPCとはそこまで関係ないので読み飛ばしてOKです)
じゃあどうすればよいのか。 一言でいうと「送信と受信のRPCを分ける」が答えです。
今回は先ほどとは趣向を変えて、チャネルをメッセージキューのように使ってみようと思います。
というわけで「メッセージ送信RPC」「メッセージ受信RPC」「チャネル作成RPC」を用意します。
-
chat2.proto
syntax = "proto3"; package chat2; service Chat { rpc SendMessages (stream MessageRequest) returns (Null) {} rpc GetMessages (User) returns (stream MessageReply) {} rpc CreateChannel (Null) returns (User) {} } message Null {}; message User { uint64 id = 1; } message MessageRequest { uint64 id = 1; string name = 2; string message = 3; } message MessageReply { string name = 1; string message = 2; }
-
chat2.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT. // source: chat2.proto package chat2 import ( context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" math "math" ) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type Null struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *Null) Reset() { *m = Null{} } func (m *Null) String() string { return proto.CompactTextString(m) } func (*Null) ProtoMessage() {} func (*Null) Descriptor() ([]byte, []int) { return fileDescriptor_acdb35e63791909c, []int{0} } func (m *Null) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Null.Unmarshal(m, b) } func (m *Null) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_Null.Marshal(b, m, deterministic) } func (m *Null) XXX_Merge(src proto.Message) { xxx_messageInfo_Null.Merge(m, src) } func (m *Null) XXX_Size() int { return xxx_messageInfo_Null.Size(m) } func (m *Null) XXX_DiscardUnknown() { xxx_messageInfo_Null.DiscardUnknown(m) } var xxx_messageInfo_Null proto.InternalMessageInfo type User struct { Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *User) Reset() { *m = User{} } func (m *User) String() string { return proto.CompactTextString(m) } func (*User) ProtoMessage() {} func (*User) Descriptor() ([]byte, []int) { return fileDescriptor_acdb35e63791909c, []int{1} } func (m *User) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_User.Unmarshal(m, b) } func (m *User) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_User.Marshal(b, m, deterministic) } func (m *User) XXX_Merge(src proto.Message) { xxx_messageInfo_User.Merge(m, src) } func (m *User) XXX_Size() int { return xxx_messageInfo_User.Size(m) } func (m *User) XXX_DiscardUnknown() { xxx_messageInfo_User.DiscardUnknown(m) } var xxx_messageInfo_User proto.InternalMessageInfo func (m *User) GetId() uint64 { if m != nil { return m.Id } return 0 } type MessageRequest struct { Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *MessageRequest) Reset() { *m = MessageRequest{} } func (m *MessageRequest) String() string { return proto.CompactTextString(m) } func (*MessageRequest) ProtoMessage() {} func (*MessageRequest) Descriptor() ([]byte, []int) { return fileDescriptor_acdb35e63791909c, []int{2} } func (m *MessageRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MessageRequest.Unmarshal(m, b) } func (m *MessageRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_MessageRequest.Marshal(b, m, deterministic) } func (m *MessageRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_MessageRequest.Merge(m, src) } func (m *MessageRequest) XXX_Size() int { return xxx_messageInfo_MessageRequest.Size(m) } func (m *MessageRequest) XXX_DiscardUnknown() { xxx_messageInfo_MessageRequest.DiscardUnknown(m) } var xxx_messageInfo_MessageRequest proto.InternalMessageInfo func (m *MessageRequest) GetId() uint64 { if m != nil { return m.Id } return 0 } func (m *MessageRequest) GetName() string { if m != nil { return m.Name } return "" } func (m *MessageRequest) GetMessage() string { if m != nil { return m.Message } return "" } type MessageReply struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *MessageReply) Reset() { *m = MessageReply{} } func (m *MessageReply) String() string { return proto.CompactTextString(m) } func (*MessageReply) ProtoMessage() {} func (*MessageReply) Descriptor() ([]byte, []int) { return fileDescriptor_acdb35e63791909c, []int{3} } func (m *MessageReply) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MessageReply.Unmarshal(m, b) } func (m *MessageReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_MessageReply.Marshal(b, m, deterministic) } func (m *MessageReply) XXX_Merge(src proto.Message) { xxx_messageInfo_MessageReply.Merge(m, src) } func (m *MessageReply) XXX_Size() int { return xxx_messageInfo_MessageReply.Size(m) } func (m *MessageReply) XXX_DiscardUnknown() { xxx_messageInfo_MessageReply.DiscardUnknown(m) } var xxx_messageInfo_MessageReply proto.InternalMessageInfo func (m *MessageReply) GetName() string { if m != nil { return m.Name } return "" } func (m *MessageReply) GetMessage() string { if m != nil { return m.Message } return "" } func init() { proto.RegisterType((*Null)(nil), "chat2.Null") proto.RegisterType((*User)(nil), "chat2.User") proto.RegisterType((*MessageRequest)(nil), "chat2.MessageRequest") proto.RegisterType((*MessageReply)(nil), "chat2.MessageReply") } func init() { proto.RegisterFile("chat2.proto", fileDescriptor_acdb35e63791909c) } var fileDescriptor_acdb35e63791909c = []byte{ // 232 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xce, 0x48, 0x2c, 0x31, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x73, 0x94, 0xd8, 0xb8, 0x58, 0xfc, 0x4a, 0x73, 0x72, 0x94, 0xc4, 0xb8, 0x58, 0x42, 0x8b, 0x53, 0x8b, 0x84, 0xf8, 0xb8, 0x98, 0x32, 0x53, 0x24, 0x18, 0x15, 0x18, 0x35, 0x58, 0x82, 0x98, 0x32, 0x53, 0x94, 0xfc, 0xb8, 0xf8, 0x7c, 0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x53, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0xd0, 0x55, 0x08, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x29, 0x30, 0x6a, 0x70, 0x06, 0x81, 0xd9, 0x42, 0x12, 0x5c, 0xec, 0xb9, 0x10, 0x5d, 0x12, 0xcc, 0x60, 0x61, 0x18, 0x57, 0xc9, 0x86, 0x8b, 0x07, 0x6e, 0x5e, 0x41, 0x4e, 0x25, 0x5c, 0x37, 0x23, 0x76, 0xdd, 0x4c, 0x28, 0xba, 0x8d, 0x16, 0x30, 0x72, 0xb1, 0x38, 0x67, 0x24, 0x96, 0x08, 0x99, 0x71, 0xf1, 0x04, 0xa7, 0xe6, 0xa5, 0x40, 0x8d, 0x2a, 0x16, 0x12, 0xd5, 0x83, 0xf8, 0x0d, 0xd5, 0xad, 0x52, 0xdc, 0x50, 0x61, 0xb0, 0x17, 0x19, 0x34, 0x18, 0x85, 0x8c, 0xb9, 0xb8, 0xdd, 0x53, 0x4b, 0xe0, 0xda, 0x60, 0xf2, 0x20, 0xaf, 0x4b, 0x09, 0xa3, 0x9b, 0x51, 0x90, 0x53, 0xa9, 0xc4, 0x60, 0xc0, 0x28, 0xa4, 0xcd, 0xc5, 0xeb, 0x5c, 0x94, 0x9a, 0x58, 0x92, 0xea, 0x9c, 0x91, 0x98, 0x97, 0x97, 0x9a, 0x23, 0x84, 0x6c, 0xac, 0x14, 0xb2, 0x19, 0x4a, 0x0c, 0x49, 0x6c, 0xe0, 0xe0, 0x35, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xfa, 0x38, 0x52, 0xd5, 0x6d, 0x01, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConnInterface // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion6 // ChatClient is the client API for Chat service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type ChatClient interface { SendMessages(ctx context.Context, opts ...grpc.CallOption) (Chat_SendMessagesClient, error) GetMessages(ctx context.Context, in *User, opts ...grpc.CallOption) (Chat_GetMessagesClient, error) CreateChannel(ctx context.Context, in *Null, opts ...grpc.CallOption) (*User, error) } type chatClient struct { cc grpc.ClientConnInterface } func NewChatClient(cc grpc.ClientConnInterface) ChatClient { return &chatClient{cc} } func (c *chatClient) SendMessages(ctx context.Context, opts ...grpc.CallOption) (Chat_SendMessagesClient, error) { stream, err := c.cc.NewStream(ctx, &_Chat_serviceDesc.Streams[0], "/chat2.Chat/SendMessages", opts...) if err != nil { return nil, err } x := &chatSendMessagesClient{stream} return x, nil } type Chat_SendMessagesClient interface { Send(*MessageRequest) error CloseAndRecv() (*Null, error) grpc.ClientStream } type chatSendMessagesClient struct { grpc.ClientStream } func (x *chatSendMessagesClient) Send(m *MessageRequest) error { return x.ClientStream.SendMsg(m) } func (x *chatSendMessagesClient) CloseAndRecv() (*Null, error) { if err := x.ClientStream.CloseSend(); err != nil { return nil, err } m := new(Null) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } func (c *chatClient) GetMessages(ctx context.Context, in *User, opts ...grpc.CallOption) (Chat_GetMessagesClient, error) { stream, err := c.cc.NewStream(ctx, &_Chat_serviceDesc.Streams[1], "/chat2.Chat/GetMessages", opts...) if err != nil { return nil, err } x := &chatGetMessagesClient{stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } if err := x.ClientStream.CloseSend(); err != nil { return nil, err } return x, nil } type Chat_GetMessagesClient interface { Recv() (*MessageReply, error) grpc.ClientStream } type chatGetMessagesClient struct { grpc.ClientStream } func (x *chatGetMessagesClient) Recv() (*MessageReply, error) { m := new(MessageReply) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } func (c *chatClient) CreateChannel(ctx context.Context, in *Null, opts ...grpc.CallOption) (*User, error) { out := new(User) err := c.cc.Invoke(ctx, "/chat2.Chat/CreateChannel", in, out, opts...) if err != nil { return nil, err } return out, nil } // ChatServer is the server API for Chat service. type ChatServer interface { SendMessages(Chat_SendMessagesServer) error GetMessages(*User, Chat_GetMessagesServer) error CreateChannel(context.Context, *Null) (*User, error) } // UnimplementedChatServer can be embedded to have forward compatible implementations. type UnimplementedChatServer struct { } func (*UnimplementedChatServer) SendMessages(srv Chat_SendMessagesServer) error { return status.Errorf(codes.Unimplemented, "method SendMessages not implemented") } func (*UnimplementedChatServer) GetMessages(req *User, srv Chat_GetMessagesServer) error { return status.Errorf(codes.Unimplemented, "method GetMessages not implemented") } func (*UnimplementedChatServer) CreateChannel(ctx context.Context, req *Null) (*User, error) { return nil, status.Errorf(codes.Unimplemented, "method CreateChannel not implemented") } func RegisterChatServer(s *grpc.Server, srv ChatServer) { s.RegisterService(&_Chat_serviceDesc, srv) } func _Chat_SendMessages_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(ChatServer).SendMessages(&chatSendMessagesServer{stream}) } type Chat_SendMessagesServer interface { SendAndClose(*Null) error Recv() (*MessageRequest, error) grpc.ServerStream } type chatSendMessagesServer struct { grpc.ServerStream } func (x *chatSendMessagesServer) SendAndClose(m *Null) error { return x.ServerStream.SendMsg(m) } func (x *chatSendMessagesServer) Recv() (*MessageRequest, error) { m := new(MessageRequest) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } return m, nil } func _Chat_GetMessages_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(User) if err := stream.RecvMsg(m); err != nil { return err } return srv.(ChatServer).GetMessages(m, &chatGetMessagesServer{stream}) } type Chat_GetMessagesServer interface { Send(*MessageReply) error grpc.ServerStream } type chatGetMessagesServer struct { grpc.ServerStream } func (x *chatGetMessagesServer) Send(m *MessageReply) error { return x.ServerStream.SendMsg(m) } func _Chat_CreateChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Null) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(ChatServer).CreateChannel(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/chat2.Chat/CreateChannel", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ChatServer).CreateChannel(ctx, req.(*Null)) } return interceptor(ctx, in, info, handler) } var _Chat_serviceDesc = grpc.ServiceDesc{ ServiceName: "chat2.Chat", HandlerType: (*ChatServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "CreateChannel", Handler: _Chat_CreateChannel_Handler, }, }, Streams: []grpc.StreamDesc{ { StreamName: "SendMessages", Handler: _Chat_SendMessages_Handler, ClientStreams: true, }, { StreamName: "GetMessages", Handler: _Chat_GetMessages_Handler, ServerStreams: true, }, }, Metadata: "chat2.proto", }
今回のプログラムの概要は以下のようになっています。
- クライアントは「チャネル作成RPC(Unary
RPC)」経由でサーバにアクセスする
- サーバ側はIDとチャネルの組を作成し、クライアントにIDを返却する
- クライアントは標準入力から入力されたメッセージと名前を
ID
とともに「メッセージ送信RPC(Client streaming RPC)」に送信する- サーバは受け取った
ID
に 紐付かない チャネル全てに対し、受け取った「名前とメッセージ」を送信する- このときクライアントには何も返却しない
- サーバは受け取った
- クライアントは「メッセージ受信RPC(Server streaming RPC)」に
ID
を送信し、サーバからメッセージを受信するたびに標準出力に表示する- この処理はクライアント側でもバックグラウンド実行 され続ける
- サーバは受け取った
ID
に 紐づく チャネルからメッセージを取り出すたびにクライアントに送信する
-
chat_server/main.go
package main import ( "context" "io" "log" "net" "time" pb "github.com/righ/grpc-go-example/chat2/chat2" "google.golang.org/grpc" ) const ( port = ":50051" ) type message struct { name string text string time time.Time } var serial = 1 var channels = make(map[uint64]chan message) type server struct { pb.UnimplementedChatServer } func (s *server) CreateChannel(_ context.Context, u *pb.Null) (*pb.User, error) { serial++ channels[uint64(serial)] = make(chan message) return &pb.User{Id: uint64(serial)}, nil } func (s *server) SendMessages(srv pb.Chat_SendMessagesServer) error { for { req, err := srv.Recv() if err != nil { srv.SendAndClose(&pb.Null{}) if err == io.EOF { break } return err } name, msg := req.GetName(), req.GetMessage() log.Printf("%s>: %s", name, msg) now := time.Now() for id, c := range channels { if id == req.Id { continue } go func(c chan message) { c <- message{name, msg, now} }(c) } } return nil } func (s *server) GetMessages(user *pb.User, srv pb.Chat_GetMessagesServer) error { for { m := <-channels[user.Id] if err := srv.Send(&pb.MessageReply{Name: m.name, Message: m.text}); err != nil { return err } } } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterChatServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
-
chat_client/main.go
package main import ( "bufio" "context" "flag" "io" "log" "os" pb "github.com/righ/grpc-go-example/chat2/chat2" "google.golang.org/grpc" ) const ( address = "grpc-server:50051" ) func main() { var ( name = flag.String("name", "noname", "user name") ) flag.Parse() conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewChatClient(conn) ctx := context.Background() user, err := c.CreateChannel(ctx, &pb.Null{}) if err != nil { log.Fatalf("could not send: %v", err) return } go func() { cli, err := c.GetMessages(context.Background(), &pb.User{Id: user.Id}) if err != nil { log.Fatalf("could not get: %v", err) return } for { reply, err := cli.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("Recv failed: %v", err) return } log.Printf("%s> %s", reply.Name, reply.Message) } }() cli, err := c.SendMessages(ctx) if err != nil { log.Fatalf("could not send: %v", err) return } scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { message := scanner.Text() if message == "" { continue } if err := cli.Send(&pb.MessageRequest{Id: user.Id, Name: *name, Message: message}); err != nil { log.Fatalf("Send failed: %v", err) } } cli.CloseAndRecv() }
今回のプログラムでは CreateChannel RPCにアクセスするたびにチャネルを作成し、 それに対応するIDをクライアントに伝えます。いわばセッションIDのようなものです(セキュリティは度外視)。
クライアントはこのIDとメッセージを一緒にメッセージ送信RPCに送信し、受け取ったサーバはIDに紐付かない(つまり自分以外の)チャネルにメッセージを送信します。
クライアントはこれに並行してメッセージ受信RPCにアクセスしておきます。サーバ側のメッセージ受信RPCはチャネルにメッセージが送られてきたタイミングでクライアントにレスポンスを返却するため、リアルタイムでメッセージが送受信されているように見えるというわけですね。
ただし、このプログラムはリソース競合やメモリリークを考慮していないのでそのまま使うのはおすすめできません。
~/chat2/chat_server# go run .
~/chat2/chat_client# go run . -name Karen
~/chat2/chat_client# go run . -name Yoko
-
2020/02/01 16:58:21 Oji> かれんちゃん、オッハー😃♥ 😃✋😍ちょっと電話できるカナ😜⁉️✋❓❗❓水曜日、会社がお休みになった、よ(^з<)😆かれんちゃんは都合どうかな( ̄ー ̄?) ドライブ🚗どウ(^з<)😃♥ ナンチャッテ(^_^)(^o^)❗(笑) 2020/02/01 16:58:47 Karen> 水曜日は風邪引くから無理かも! 2020/02/01 16:59:16 Oji> くれぐれも体調に気をつけて( ̄▽ ̄)(^^ 🤑😤ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂
-
2020/02/01 16:58:21 Oji> かれんちゃん、オッハー😃♥ 😃✋😍ちょっと電話できるカナ😜⁉️✋❓❗❓水曜日、会社がお休みになった、よ(^з<)😆かれんちゃんは都合どうかな( ̄ー ̄?)ドライブ🚗どウ(^з<)😃♥ ナンチャッテ(^_^)(^o^)❗(笑) 水曜日は風邪引くから無理かも! 2020/02/01 16:59:16 Oji> くれぐれも体調に気をつけて( ̄▽ ̄)(^^ 🤑😤ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂
-
かれんちゃん、オッハー😃♥ 😃✋😍ちょっと電話できるカナ😜⁉️✋❓❗❓水曜日、会社がお休みになった、よ(^з<)😆かれんちゃんは都合どうかな( ̄ー ̄?)ドライブ🚗どウ(^з<)😃♥ ナンチャッテ(^_^)(^o^)❗(笑) 2020/02/01 16:58:47 Karen> 水曜日は風邪引くから無理かも! くれぐれも体調に気をつけて( ̄▽ ̄)(^^ 🤑😤ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂
sequenceDiagram
participant client1
participant server
participant client2
client2->>server: かれんちゃん、オッハー😃♥ 😃✋😍<br/>ちょっと電話できるカナ😜⁉️✋❓❗❓<br/>水曜日、会社がお休みになった、よ(^з<)😆<br/>かれんちゃんは都合どうかな( ̄ー ̄?)<br/> ドライブ🚗どウ(^з<)😃♥ <br/>ナンチャッテ(^_^)(^o^)❗(笑)
server->>client1: かれんちゃん、オッハー😃♥ 😃✋😍<br/>ちょっと電話できるカナ😜⁉️✋❓❗❓<br/>水曜日、会社がお休みになった、よ(^з<)😆<br/>かれんちゃんは都合どうかな( ̄ー ̄?)<br/> ドライブ🚗どウ(^з<)😃♥ <br/>ナンチャッテ(^_^)(^o^)❗(笑)
client1->>server: 水曜日は風邪引くから無理かも!
server->>client2: 水曜日は風邪引くから無理かも!
client2->>server: くれぐれも体調に気をつけて( ̄▽ ̄)(^^ 🤑😤<br/>ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂
server->>client1: くれぐれも体調に気をつけて( ̄▽ ̄)(^^ 🤑😤<br/>ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂
今回はちゃんと会話が噛み合っていますね!
👽 Meta data
メタデータは構造化されていない通信データです。
メインではない何らかの付加情報をやり取りするときに使います。 といってもあまり良い例が思い浮かびませんが、ログなどに記録されるタイムスタンプなどでしょうか。
既存のにメタデータを使ったexampleがあったのでそれを使います。(対象ホストだけ変更)
まずは定義から。この時点で変わったところはありません。
-
echo.proto
syntax = "proto3"; package grpc.examples.echo; option go_package = "google.golang.org/grpc/examples/features/proto/echo"; // EchoRequest is the request for echo. message EchoRequest { string message = 1; } // EchoResponse is the response for echo. message EchoResponse { string message = 1; } // Echo is the echo service. service Echo { // UnaryEcho is unary echo. rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} // ServerStreamingEcho is server side streaming. rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} // ClientStreamingEcho is client side streaming. rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} // BidirectionalStreamingEcho is bidi streaming. rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} }
-
echo.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT. // source: echo.proto package echo import ( context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" math "math" ) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package // EchoRequest is the request for echo. type EchoRequest struct { Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *EchoRequest) Reset() { *m = EchoRequest{} } func (m *EchoRequest) String() string { return proto.CompactTextString(m) } func (*EchoRequest) ProtoMessage() {} func (*EchoRequest) Descriptor() ([]byte, []int) { return fileDescriptor_08134aea513e0001, []int{0} } func (m *EchoRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_EchoRequest.Unmarshal(m, b) } func (m *EchoRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_EchoRequest.Marshal(b, m, deterministic) } func (m *EchoRequest) XXX_Merge(src proto.Message) { xxx_messageInfo_EchoRequest.Merge(m, src) } func (m *EchoRequest) XXX_Size() int { return xxx_messageInfo_EchoRequest.Size(m) } func (m *EchoRequest) XXX_DiscardUnknown() { xxx_messageInfo_EchoRequest.DiscardUnknown(m) } var xxx_messageInfo_EchoRequest proto.InternalMessageInfo func (m *EchoRequest) GetMessage() string { if m != nil { return m.Message } return "" } // EchoResponse is the response for echo. type EchoResponse struct { Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } func (m *EchoResponse) Reset() { *m = EchoResponse{} } func (m *EchoResponse) String() string { return proto.CompactTextString(m) } func (*EchoResponse) ProtoMessage() {} func (*EchoResponse) Descriptor() ([]byte, []int) { return fileDescriptor_08134aea513e0001, []int{1} } func (m *EchoResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_EchoResponse.Unmarshal(m, b) } func (m *EchoResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_EchoResponse.Marshal(b, m, deterministic) } func (m *EchoResponse) XXX_Merge(src proto.Message) { xxx_messageInfo_EchoResponse.Merge(m, src) } func (m *EchoResponse) XXX_Size() int { return xxx_messageInfo_EchoResponse.Size(m) } func (m *EchoResponse) XXX_DiscardUnknown() { xxx_messageInfo_EchoResponse.DiscardUnknown(m) } var xxx_messageInfo_EchoResponse proto.InternalMessageInfo func (m *EchoResponse) GetMessage() string { if m != nil { return m.Message } return "" } func init() { proto.RegisterType((*EchoRequest)(nil), "grpc.examples.echo.EchoRequest") proto.RegisterType((*EchoResponse)(nil), "grpc.examples.echo.EchoResponse") } func init() { proto.RegisterFile("echo.proto", fileDescriptor_08134aea513e0001) } var fileDescriptor_08134aea513e0001 = []byte{ // 234 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x92, 0xb1, 0x4b, 0x03, 0x31, 0x14, 0x87, 0x3d, 0x11, 0xa5, 0x4f, 0xa7, 0xb8, 0x94, 0x2e, 0x96, 0x5b, 0xbc, 0x29, 0x29, 0x16, 0xff, 0x81, 0x8a, 0xbb, 0xb4, 0xb8, 0x88, 0x4b, 0x3c, 0x7f, 0xa6, 0x81, 0x5c, 0xde, 0xf9, 0x92, 0x8a, 0xfe, 0xed, 0x2e, 0x92, 0x2b, 0x05, 0x41, 0xba, 0xd5, 0x2d, 0x8f, 0x7c, 0xef, 0xfb, 0x96, 0x47, 0x84, 0x76, 0xcd, 0xba, 0x17, 0xce, 0xac, 0x94, 0x93, 0xbe, 0xd5, 0xf8, 0xb4, 0x5d, 0x1f, 0x90, 0x74, 0xf9, 0xa9, 0xaf, 0xe9, 0xfc, 0xbe, 0x5d, 0xf3, 0x12, 0xef, 0x1b, 0xa4, 0xac, 0xc6, 0x74, 0xd6, 0x21, 0x25, 0xeb, 0x30, 0xae, 0xa6, 0x55, 0x33, 0x5a, 0xee, 0xc6, 0xba, 0xa1, 0x8b, 0x2d, 0x98, 0x7a, 0x8e, 0x09, 0xfb, 0xc9, 0x9b, 0xef, 0x63, 0x3a, 0x29, 0xa8, 0x7a, 0xa0, 0xd1, 0x63, 0xb4, 0xf2, 0x35, 0x0c, 0x57, 0xfa, 0x6f, 0x5d, 0xff, 0x4a, 0x4f, 0xa6, 0xfb, 0x81, 0x6d, 0xb2, 0x3e, 0x52, 0xcf, 0x74, 0xb9, 0x82, 0x7c, 0x40, 0x56, 0x59, 0x60, 0x3b, 0x1f, 0xdd, 0xc1, 0xdc, 0xb3, 0xaa, 0xd8, 0xef, 0x82, 0x47, 0xcc, 0x87, 0xb7, 0x37, 0x95, 0x02, 0x4d, 0x16, 0xfe, 0xd5, 0x0b, 0xda, 0xec, 0x39, 0xda, 0xf0, 0x1f, 0x91, 0x59, 0xb5, 0xb8, 0x7d, 0x9a, 0x3b, 0x66, 0x17, 0xa0, 0x1d, 0x07, 0x1b, 0x9d, 0x66, 0x71, 0xa6, 0xac, 0x9a, 0xdd, 0xaa, 0x79, 0x83, 0xcd, 0x1b, 0x41, 0x32, 0xc3, 0x59, 0x98, 0x62, 0x7a, 0x39, 0x1d, 0xde, 0xf3, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x23, 0x14, 0x26, 0x96, 0x30, 0x02, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConn // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 // EchoClient is the client API for Echo service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type EchoClient interface { // UnaryEcho is unary echo. UnaryEcho(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (*EchoResponse, error) // ServerStreamingEcho is server side streaming. ServerStreamingEcho(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (Echo_ServerStreamingEchoClient, error) // ClientStreamingEcho is client side streaming. ClientStreamingEcho(ctx context.Context, opts ...grpc.CallOption) (Echo_ClientStreamingEchoClient, error) // BidirectionalStreamingEcho is bidi streaming. BidirectionalStreamingEcho(ctx context.Context, opts ...grpc.CallOption) (Echo_BidirectionalStreamingEchoClient, error) } type echoClient struct { cc *grpc.ClientConn } func NewEchoClient(cc *grpc.ClientConn) EchoClient { return &echoClient{cc} } func (c *echoClient) UnaryEcho(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (*EchoResponse, error) { out := new(EchoResponse) err := c.cc.Invoke(ctx, "/grpc.examples.echo.Echo/UnaryEcho", in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *echoClient) ServerStreamingEcho(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (Echo_ServerStreamingEchoClient, error) { stream, err := c.cc.NewStream(ctx, &_Echo_serviceDesc.Streams[0], "/grpc.examples.echo.Echo/ServerStreamingEcho", opts...) if err != nil { return nil, err } x := &echoServerStreamingEchoClient{stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } if err := x.ClientStream.CloseSend(); err != nil { return nil, err } return x, nil } type Echo_ServerStreamingEchoClient interface { Recv() (*EchoResponse, error) grpc.ClientStream } type echoServerStreamingEchoClient struct { grpc.ClientStream } func (x *echoServerStreamingEchoClient) Recv() (*EchoResponse, error) { m := new(EchoResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } func (c *echoClient) ClientStreamingEcho(ctx context.Context, opts ...grpc.CallOption) (Echo_ClientStreamingEchoClient, error) { stream, err := c.cc.NewStream(ctx, &_Echo_serviceDesc.Streams[1], "/grpc.examples.echo.Echo/ClientStreamingEcho", opts...) if err != nil { return nil, err } x := &echoClientStreamingEchoClient{stream} return x, nil } type Echo_ClientStreamingEchoClient interface { Send(*EchoRequest) error CloseAndRecv() (*EchoResponse, error) grpc.ClientStream } type echoClientStreamingEchoClient struct { grpc.ClientStream } func (x *echoClientStreamingEchoClient) Send(m *EchoRequest) error { return x.ClientStream.SendMsg(m) } func (x *echoClientStreamingEchoClient) CloseAndRecv() (*EchoResponse, error) { if err := x.ClientStream.CloseSend(); err != nil { return nil, err } m := new(EchoResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } func (c *echoClient) BidirectionalStreamingEcho(ctx context.Context, opts ...grpc.CallOption) (Echo_BidirectionalStreamingEchoClient, error) { stream, err := c.cc.NewStream(ctx, &_Echo_serviceDesc.Streams[2], "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", opts...) if err != nil { return nil, err } x := &echoBidirectionalStreamingEchoClient{stream} return x, nil } type Echo_BidirectionalStreamingEchoClient interface { Send(*EchoRequest) error Recv() (*EchoResponse, error) grpc.ClientStream } type echoBidirectionalStreamingEchoClient struct { grpc.ClientStream } func (x *echoBidirectionalStreamingEchoClient) Send(m *EchoRequest) error { return x.ClientStream.SendMsg(m) } func (x *echoBidirectionalStreamingEchoClient) Recv() (*EchoResponse, error) { m := new(EchoResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } // EchoServer is the server API for Echo service. type EchoServer interface { // UnaryEcho is unary echo. UnaryEcho(context.Context, *EchoRequest) (*EchoResponse, error) // ServerStreamingEcho is server side streaming. ServerStreamingEcho(*EchoRequest, Echo_ServerStreamingEchoServer) error // ClientStreamingEcho is client side streaming. ClientStreamingEcho(Echo_ClientStreamingEchoServer) error // BidirectionalStreamingEcho is bidi streaming. BidirectionalStreamingEcho(Echo_BidirectionalStreamingEchoServer) error } // UnimplementedEchoServer can be embedded to have forward compatible implementations. type UnimplementedEchoServer struct { } func (*UnimplementedEchoServer) UnaryEcho(ctx context.Context, req *EchoRequest) (*EchoResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method UnaryEcho not implemented") } func (*UnimplementedEchoServer) ServerStreamingEcho(req *EchoRequest, srv Echo_ServerStreamingEchoServer) error { return status.Errorf(codes.Unimplemented, "method ServerStreamingEcho not implemented") } func (*UnimplementedEchoServer) ClientStreamingEcho(srv Echo_ClientStreamingEchoServer) error { return status.Errorf(codes.Unimplemented, "method ClientStreamingEcho not implemented") } func (*UnimplementedEchoServer) BidirectionalStreamingEcho(srv Echo_BidirectionalStreamingEchoServer) error { return status.Errorf(codes.Unimplemented, "method BidirectionalStreamingEcho not implemented") } func RegisterEchoServer(s *grpc.Server, srv EchoServer) { s.RegisterService(&_Echo_serviceDesc, srv) } func _Echo_UnaryEcho_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(EchoRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(EchoServer).UnaryEcho(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/grpc.examples.echo.Echo/UnaryEcho", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(EchoServer).UnaryEcho(ctx, req.(*EchoRequest)) } return interceptor(ctx, in, info, handler) } func _Echo_ServerStreamingEcho_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(EchoRequest) if err := stream.RecvMsg(m); err != nil { return err } return srv.(EchoServer).ServerStreamingEcho(m, &echoServerStreamingEchoServer{stream}) } type Echo_ServerStreamingEchoServer interface { Send(*EchoResponse) error grpc.ServerStream } type echoServerStreamingEchoServer struct { grpc.ServerStream } func (x *echoServerStreamingEchoServer) Send(m *EchoResponse) error { return x.ServerStream.SendMsg(m) } func _Echo_ClientStreamingEcho_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(EchoServer).ClientStreamingEcho(&echoClientStreamingEchoServer{stream}) } type Echo_ClientStreamingEchoServer interface { SendAndClose(*EchoResponse) error Recv() (*EchoRequest, error) grpc.ServerStream } type echoClientStreamingEchoServer struct { grpc.ServerStream } func (x *echoClientStreamingEchoServer) SendAndClose(m *EchoResponse) error { return x.ServerStream.SendMsg(m) } func (x *echoClientStreamingEchoServer) Recv() (*EchoRequest, error) { m := new(EchoRequest) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } return m, nil } func _Echo_BidirectionalStreamingEcho_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(EchoServer).BidirectionalStreamingEcho(&echoBidirectionalStreamingEchoServer{stream}) } type Echo_BidirectionalStreamingEchoServer interface { Send(*EchoResponse) error Recv() (*EchoRequest, error) grpc.ServerStream } type echoBidirectionalStreamingEchoServer struct { grpc.ServerStream } func (x *echoBidirectionalStreamingEchoServer) Send(m *EchoResponse) error { return x.ServerStream.SendMsg(m) } func (x *echoBidirectionalStreamingEchoServer) Recv() (*EchoRequest, error) { m := new(EchoRequest) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } return m, nil } var _Echo_serviceDesc = grpc.ServiceDesc{ ServiceName: "grpc.examples.echo.Echo", HandlerType: (*EchoServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "UnaryEcho", Handler: _Echo_UnaryEcho_Handler, }, }, Streams: []grpc.StreamDesc{ { StreamName: "ServerStreamingEcho", Handler: _Echo_ServerStreamingEcho_Handler, ServerStreams: true, }, { StreamName: "ClientStreamingEcho", Handler: _Echo_ClientStreamingEcho_Handler, ClientStreams: true, }, { StreamName: "BidirectionalStreamingEcho", Handler: _Echo_BidirectionalStreamingEcho_Handler, ServerStreams: true, ClientStreams: true, }, }, Metadata: "echo.proto", }
以下、プログラムです。 "google.golang.org/grpc/metadata"
を使うのがミソですね。
-
server/main.go
package main import ( "context" "flag" "fmt" "io" "log" "math/rand" "net" "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" pb "google.golang.org/grpc/examples/features/proto/echo" ) var port = flag.Int("port", 50051, "the port to serve on") const ( timestampFormat = time.StampNano streamingCount = 10 ) type server struct { pb.UnimplementedEchoServer } func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) { fmt.Printf("--- UnaryEcho ---\n") // Create trailer in defer to record function return time. defer func() { trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) grpc.SetTrailer(ctx, trailer) }() // Read metadata from client. md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata") } if t, ok := md["timestamp"]; ok { fmt.Printf("timestamp from metadata:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } // Create and send header. header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)}) grpc.SendHeader(ctx, header) fmt.Printf("request received: %v, sending echo\n", in) return &pb.EchoResponse{Message: in.Message}, nil } func (s *server) ServerStreamingEcho(in *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error { fmt.Printf("--- ServerStreamingEcho ---\n") // Create trailer in defer to record function return time. defer func() { trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) stream.SetTrailer(trailer) }() // Read metadata from client. md, ok := metadata.FromIncomingContext(stream.Context()) if !ok { return status.Errorf(codes.DataLoss, "ServerStreamingEcho: failed to get metadata") } if t, ok := md["timestamp"]; ok { fmt.Printf("timestamp from metadata:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } // Create and send header. header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)}) stream.SendHeader(header) fmt.Printf("request received: %v\n", in) // Read requests and send responses. for i := 0; i < streamingCount; i++ { fmt.Printf("echo message %v\n", in.Message) err := stream.Send(&pb.EchoResponse{Message: in.Message}) if err != nil { return err } } return nil } func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error { fmt.Printf("--- ClientStreamingEcho ---\n") // Create trailer in defer to record function return time. defer func() { trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) stream.SetTrailer(trailer) }() // Read metadata from client. md, ok := metadata.FromIncomingContext(stream.Context()) if !ok { return status.Errorf(codes.DataLoss, "ClientStreamingEcho: failed to get metadata") } if t, ok := md["timestamp"]; ok { fmt.Printf("timestamp from metadata:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } // Create and send header. header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)}) stream.SendHeader(header) // Read requests and send responses. var message string for { in, err := stream.Recv() if err == io.EOF { fmt.Printf("echo last received message\n") return stream.SendAndClose(&pb.EchoResponse{Message: message}) } message = in.Message fmt.Printf("request received: %v, building echo\n", in) if err != nil { return err } } } func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error { fmt.Printf("--- BidirectionalStreamingEcho ---\n") // Create trailer in defer to record function return time. defer func() { trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) stream.SetTrailer(trailer) }() // Read metadata from client. md, ok := metadata.FromIncomingContext(stream.Context()) if !ok { return status.Errorf(codes.DataLoss, "BidirectionalStreamingEcho: failed to get metadata") } if t, ok := md["timestamp"]; ok { fmt.Printf("timestamp from metadata:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } // Create and send header. header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)}) stream.SendHeader(header) // Read requests and send responses. for { in, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } fmt.Printf("request received %v, sending echo\n", in) if err := stream.Send(&pb.EchoResponse{Message: in.Message}); err != nil { return err } } } func main() { flag.Parse() rand.Seed(time.Now().UnixNano()) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } fmt.Printf("server listening at %v\n", lis.Addr()) s := grpc.NewServer() pb.RegisterEchoServer(s, &server{}) s.Serve(lis) }
-
client/main.go
package main import ( "context" "flag" "fmt" "io" "log" "time" "google.golang.org/grpc" pb "google.golang.org/grpc/examples/features/proto/echo" "google.golang.org/grpc/metadata" ) var addr = flag.String("addr", "grpc-server:50051", "the address to connect to") const ( timestampFormat = time.StampNano // "Jan _2 15:04:05.000" streamingCount = 10 ) func unaryCallWithMetadata(c pb.EchoClient, message string) { fmt.Printf("--- unary ---\n") // Create metadata and context. md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) ctx := metadata.NewOutgoingContext(context.Background(), md) // Make RPC using the context with the metadata. var header, trailer metadata.MD r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}, grpc.Header(&header), grpc.Trailer(&trailer)) if err != nil { log.Fatalf("failed to call UnaryEcho: %v", err) } if t, ok := header["timestamp"]; ok { fmt.Printf("timestamp from header:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in header") } if l, ok := header["location"]; ok { fmt.Printf("location from header:\n") for i, e := range l { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("location expected but doesn't exist in header") } fmt.Printf("response:\n") fmt.Printf(" - %s\n", r.Message) if t, ok := trailer["timestamp"]; ok { fmt.Printf("timestamp from trailer:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in trailer") } } func serverStreamingWithMetadata(c pb.EchoClient, message string) { fmt.Printf("--- server streaming ---\n") // Create metadata and context. md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) ctx := metadata.NewOutgoingContext(context.Background(), md) // Make RPC using the context with the metadata. stream, err := c.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: message}) if err != nil { log.Fatalf("failed to call ServerStreamingEcho: %v", err) } // Read the header when the header arrives. header, err := stream.Header() if err != nil { log.Fatalf("failed to get header from stream: %v", err) } // Read metadata from server's header. if t, ok := header["timestamp"]; ok { fmt.Printf("timestamp from header:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in header") } if l, ok := header["location"]; ok { fmt.Printf("location from header:\n") for i, e := range l { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("location expected but doesn't exist in header") } // Read all the responses. var rpcStatus error fmt.Printf("response:\n") for { r, err := stream.Recv() if err != nil { rpcStatus = err break } fmt.Printf(" - %s\n", r.Message) } if rpcStatus != io.EOF { log.Fatalf("failed to finish server streaming: %v", rpcStatus) } // Read the trailer after the RPC is finished. trailer := stream.Trailer() // Read metadata from server's trailer. if t, ok := trailer["timestamp"]; ok { fmt.Printf("timestamp from trailer:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in trailer") } } func clientStreamWithMetadata(c pb.EchoClient, message string) { fmt.Printf("--- client streaming ---\n") // Create metadata and context. md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) ctx := metadata.NewOutgoingContext(context.Background(), md) // Make RPC using the context with the metadata. stream, err := c.ClientStreamingEcho(ctx) if err != nil { log.Fatalf("failed to call ClientStreamingEcho: %v\n", err) } // Read the header when the header arrives. header, err := stream.Header() if err != nil { log.Fatalf("failed to get header from stream: %v", err) } // Read metadata from server's header. if t, ok := header["timestamp"]; ok { fmt.Printf("timestamp from header:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in header") } if l, ok := header["location"]; ok { fmt.Printf("location from header:\n") for i, e := range l { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("location expected but doesn't exist in header") } // Send all requests to the server. for i := 0; i < streamingCount; i++ { if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil { log.Fatalf("failed to send streaming: %v\n", err) } } // Read the response. r, err := stream.CloseAndRecv() if err != nil { log.Fatalf("failed to CloseAndRecv: %v\n", err) } fmt.Printf("response:\n") fmt.Printf(" - %s\n\n", r.Message) // Read the trailer after the RPC is finished. trailer := stream.Trailer() // Read metadata from server's trailer. if t, ok := trailer["timestamp"]; ok { fmt.Printf("timestamp from trailer:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in trailer") } } func bidirectionalWithMetadata(c pb.EchoClient, message string) { fmt.Printf("--- bidirectional ---\n") // Create metadata and context. md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) ctx := metadata.NewOutgoingContext(context.Background(), md) // Make RPC using the context with the metadata. stream, err := c.BidirectionalStreamingEcho(ctx) if err != nil { log.Fatalf("failed to call BidirectionalStreamingEcho: %v\n", err) } go func() { // Read the header when the header arrives. header, err := stream.Header() if err != nil { log.Fatalf("failed to get header from stream: %v", err) } // Read metadata from server's header. if t, ok := header["timestamp"]; ok { fmt.Printf("timestamp from header:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in header") } if l, ok := header["location"]; ok { fmt.Printf("location from header:\n") for i, e := range l { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("location expected but doesn't exist in header") } // Send all requests to the server. for i := 0; i < streamingCount; i++ { if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil { log.Fatalf("failed to send streaming: %v\n", err) } } stream.CloseSend() }() // Read all the responses. var rpcStatus error fmt.Printf("response:\n") for { r, err := stream.Recv() if err != nil { rpcStatus = err break } fmt.Printf(" - %s\n", r.Message) } if rpcStatus != io.EOF { log.Fatalf("failed to finish server streaming: %v", rpcStatus) } // Read the trailer after the RPC is finished. trailer := stream.Trailer() // Read metadata from server's trailer. if t, ok := trailer["timestamp"]; ok { fmt.Printf("timestamp from trailer:\n") for i, e := range t { fmt.Printf(" %d. %s\n", i, e) } } else { log.Fatal("timestamp expected but doesn't exist in trailer") } } const message = "this is examples/metadata" func main() { flag.Parse() // Set up a connection to the server. conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewEchoClient(conn) unaryCallWithMetadata(c, message) time.Sleep(1 * time.Second) serverStreamingWithMetadata(c, message) time.Sleep(1 * time.Second) clientStreamWithMetadata(c, message) time.Sleep(1 * time.Second) bidirectionalWithMetadata(c, message) }
- サーバはコンテキスト経由でメタデータを受け取ります。
md, ok := metadata.FromIncomingContext(ctx)
のようにコンテキストからメタデータを取得します。 メタデータはマップ型なのでmd["timestamp"]
のように値を抽出します クライアントに送信する場合は、 SendHeader か SetTrailer メソッドを使います。 レスポンスの前に送るメタデータが Header, あとに送るメタデータが Trailer ということらしいです。 これは gRPC というより HTTP2 の用語です。 (gRPCにおけるmetadata、そしてそれを node.js client から取得する - Qiita) - クライアントからメタデータを送信するにはコンテキストを使います。
メタデータ自体は metadata.Pairs で作り、
metadata.NewOutgoingContext(context.Background(), md)
でメタデータを紐付けたコンテキストを作成します。 受信する方法ですが、Unary RPC の場合は...grpc.CallOption
に複数指定します。 Headerを受け取るときはgrpc.Header(&header), Trailer
を受け取るときはgrpc.Trailer(&trailer)
のように metadata.MD 変数のアドレスを渡すことでサーバからのメタデータを受け取ります。 Streaming RPC の場合はstream.Header()
やstream.Trailer()
のように得られます。
とりあえず使ってみます。
~/features/metadata/server# go run .
~/features/metadata/client# go run .
-
server listening at [::]:50051 --- UnaryEcho --- timestamp from metadata: 0. Feb 1 17:27:41.704179000 request received: message:"this is examples/metadata" , sending echo --- ServerStreamingEcho --- timestamp from metadata: 0. Feb 1 17:27:42.761855500 request received: message:"this is examples/metadata" echo message this is examples/metadata echo message this is examples/metadata echo message this is examples/metadata echo message this is examples/metadata echo message this is examples/metadata echo message this is examples/metadata echo message this is examples/metadata echo message this is examples/metadata echo message this is examples/metadata echo message this is examples/metadata --- ClientStreamingEcho --- timestamp from metadata: 0. Feb 1 17:27:43.764238600 request received: message:"this is examples/metadata" , building echo request received: message:"this is examples/metadata" , building echo request received: message:"this is examples/metadata" , building echo request received: message:"this is examples/metadata" , building echo request received: message:"this is examples/metadata" , building echo request received: message:"this is examples/metadata" , building echo request received: message:"this is examples/metadata" , building echo request received: message:"this is examples/metadata" , building echo request received: message:"this is examples/metadata" , building echo request received: message:"this is examples/metadata" , building echo echo last received message --- BidirectionalStreamingEcho --- timestamp from metadata: 0. Feb 1 17:27:44.769518100 request received message:"this is examples/metadata" , sending echo request received message:"this is examples/metadata" , sending echo request received message:"this is examples/metadata" , sending echo request received message:"this is examples/metadata" , sending echo request received message:"this is examples/metadata" , sending echo request received message:"this is examples/metadata" , sending echo request received message:"this is examples/metadata" , sending echo request received message:"this is examples/metadata" , sending echo request received message:"this is examples/metadata" , sending echo request received message:"this is examples/metadata" , sending echo
-
--- unary --- timestamp from header: 0. Feb 1 17:27:41.717621700 location from header: 0. MTV response: - this is examples/metadata timestamp from trailer: 0. Feb 1 17:27:41.717779600 --- server streaming --- timestamp from header: 0. Feb 1 17:27:42.763338200 location from header: 0. MTV response: - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata timestamp from trailer: 0. Feb 1 17:27:42.763500200 --- client streaming --- timestamp from header: 0. Feb 1 17:27:43.764817300 location from header: 0. MTV response: - this is examples/metadata timestamp from trailer: 0. Feb 1 17:27:43.767922700 --- bidirectional --- response: timestamp from header: 0. Feb 1 17:27:44.771782200 location from header: 0. MTV - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata - this is examples/metadata timestamp from trailer: 0. Feb 1 17:27:44.772841200
出力が多くてちょっと分かりづらいですが、 メタデータがやり取りできてるっぽいです。
使い分けを簡単にまとめ。ストリーミングは全部同じですが..
- 種類
-
- サーバ
- クライアント
-
- Unary RPC
-
-
-
- header
grpc.SetTrailer(ctx, trailer)
-
- trailer
grpc.SendHeader(ctx, header)
-
-
c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}, grpc.Header(&header), grpc.Trailer(&trailer))
-
-
- Server streaming RPC
-
-
-
- header
stream.SendHeader(header)
-
- trailer
stream.SetTrailer(trailer)
-
-
-
- header
header := stream.Header()
-
- trailer
trailer := stream.Trailer()
-
-
-
- Client streaming RPC
-
-
-
- header
stream.SendHeader(header)
-
- trailer
stream.SetTrailer(trailer)
-
-
-
- header
header := stream.Header()
-
- trailer
trailer := stream.Trailer()
-
-
-
- Bidiretional stream RPC
-
-
-
- header
stream.SendHeader(header)
-
- trailer
stream.SetTrailer(trailer)
-
-
-
- header
header := stream.Header()
-
- trailer
trailer := stream.Trailer()
-
-
-
流石に疲れたのでここまで
テストは別の記事にするかもしれません。(しれないかもしれません)
間違いがあったら優しく教えて下さい。
参考