Go で gRPC を使うためのまとめ

2020-02-06

gRPCを使いそうなので記事にまとめておきます。

ちょっとだけ長いので必要なところだけ読めばいいと思います。 あとスマホで見ようとすると重いのでPCで見ることをおすすめします。

今回使うコードは https://github.com/righ/grpc-go-example/ リポジトリに置いてあります。

https://github.com/grpc/grpc-go/examples をベースに修正と追記をしたものです。

clone して docker-compose すればコンテナが2つ起動します。

$ git clone https://github.com/righ/grpc-go-example/
$ cd grpc-go-example/

$ docker-compose up
Creating network "grpc_default" with the default driver
Creating go2 ... done
Creating go1 ... done
Attaching to go2, go1

以下の内容はすべてこれらのコンテナが起動している前提なので、手元で試したい方は clone してください。

目次

🌀 とりあえず使ってみる

起動した2つのコンテナ(grpc-server,grpc-client)に入って プログラムをインストールします。

インストールしたプログラムは $GOPATH/bin/ に配置されるのでそのまま実行できます。

grpc-server

grpc-client

$ docker exec -it grpc-server /bin/bash
root@82769e87d925:~# go get -u github.com/righ/grpc-go-example/helloworld/greeter_server
root@82769e87d925:~# greeter_server

2020/02/01 12:34:44 Received: world

2020/02/01 12:34:51 Received: world
$ docker exec -it grpc-client /bin/bash
root@189290506d75:~# go get -u github.com/righ/grpc-go-example/helloworld/greeter_client

root@189290506d75:~# greeter_client
2020/02/01 12:34:44 Greeting: Hello world
root@189290506d75:~# greeter_client
2020/02/01 12:34:51 Greeting: Hello world

クライアントプログラムを実行するたびに以下の処理が実行されているだけです。

  • リクエストを受け取ったサーバはその内容を表示

  • クライアントは返却されたレスポンスを表示

備考

以降のプログラムも、サーバは grpc-server コンテナ、 クライアントは grpc-client コンテナで動くことを前提に書いています。

この程度の通信プログラムくらいなら自力でも簡単に書くことができますよね。

何故 gRPC を使うと嬉しいのか。 gRPC 自体が高速だというのもあるのですが、別の理由としては Protocol Buffer の存在が大きいです。

📖 Protocol Buffer

まずは簡単な説明を。

Protocol Buffers(プロトコルバッファー)はインタフェース定義言語 (IDL) で構造を定義する通信や永続化での利用を目的としたシリアライズフォーマットであり、Googleにより開発されている。

https://ja.wikipedia.org/wiki/Protocol_Buffers

gRPCはこの Protocol Buffer で定義された構造を使って通信を行います。

定義は下記(左)のようなテキストファイルです(拡張子を proto とするのが慣例)

grpc-go-example/helloworld/helloworld/helloworld.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

grpc-go-example/helloworld/helloworld/helloworld.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: helloworld.proto

package helloworld

import (
	context "context"
	fmt "fmt"
	proto "github.com/golang/protobuf/proto"
	grpc "google.golang.org/grpc"
	codes "google.golang.org/grpc/codes"
	status "google.golang.org/grpc/status"
	math "math"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

// The request message containing the user's name.
type HelloRequest struct {
	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *HelloRequest) Reset()         { *m = HelloRequest{} }
func (m *HelloRequest) String() string { return proto.CompactTextString(m) }
func (*HelloRequest) ProtoMessage()    {}
func (*HelloRequest) Descriptor() ([]byte, []int) {
	return fileDescriptor_17b8c58d586b62f2, []int{0}
}

func (m *HelloRequest) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_HelloRequest.Unmarshal(m, b)
}
func (m *HelloRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_HelloRequest.Marshal(b, m, deterministic)
}
func (m *HelloRequest) XXX_Merge(src proto.Message) {
	xxx_messageInfo_HelloRequest.Merge(m, src)
}
func (m *HelloRequest) XXX_Size() int {
	return xxx_messageInfo_HelloRequest.Size(m)
}
func (m *HelloRequest) XXX_DiscardUnknown() {
	xxx_messageInfo_HelloRequest.DiscardUnknown(m)
}

var xxx_messageInfo_HelloRequest proto.InternalMessageInfo

func (m *HelloRequest) GetName() string {
	if m != nil {
		return m.Name
	}
	return ""
}

// The response message containing the greetings
type HelloReply struct {
	Message              string   `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *HelloReply) Reset()         { *m = HelloReply{} }
func (m *HelloReply) String() string { return proto.CompactTextString(m) }
func (*HelloReply) ProtoMessage()    {}
func (*HelloReply) Descriptor() ([]byte, []int) {
	return fileDescriptor_17b8c58d586b62f2, []int{1}
}

func (m *HelloReply) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_HelloReply.Unmarshal(m, b)
}
func (m *HelloReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_HelloReply.Marshal(b, m, deterministic)
}
func (m *HelloReply) XXX_Merge(src proto.Message) {
	xxx_messageInfo_HelloReply.Merge(m, src)
}
func (m *HelloReply) XXX_Size() int {
	return xxx_messageInfo_HelloReply.Size(m)
}
func (m *HelloReply) XXX_DiscardUnknown() {
	xxx_messageInfo_HelloReply.DiscardUnknown(m)
}

var xxx_messageInfo_HelloReply proto.InternalMessageInfo

func (m *HelloReply) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

func init() {
	proto.RegisterType((*HelloRequest)(nil), "helloworld.HelloRequest")
	proto.RegisterType((*HelloReply)(nil), "helloworld.HelloReply")
}

func init() { proto.RegisterFile("helloworld.proto", fileDescriptor_17b8c58d586b62f2) }

var fileDescriptor_17b8c58d586b62f2 = []byte{
	// 175 bytes of a gzipped FileDescriptorProto
	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0xcd, 0xc9,
	0xc9, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88,
	0x28, 0x29, 0x71, 0xf1, 0x78, 0x80, 0x78, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x42,
	0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0xb6, 0x92,
	0x1a, 0x17, 0x17, 0x54, 0x4d, 0x41, 0x4e, 0xa5, 0x90, 0x04, 0x17, 0x7b, 0x6e, 0x6a, 0x71, 0x71,
	0x62, 0x3a, 0x4c, 0x11, 0x8c, 0x6b, 0xe4, 0xc9, 0xc5, 0xee, 0x5e, 0x94, 0x9a, 0x5a, 0x92, 0x5a,
	0x24, 0x64, 0xc7, 0xc5, 0x11, 0x9c, 0x58, 0x09, 0xd6, 0x25, 0x24, 0xa1, 0x87, 0xe4, 0x02, 0x64,
	0xcb, 0xa4, 0xc4, 0xb0, 0xc8, 0x14, 0xe4, 0x54, 0x2a, 0x31, 0x38, 0x19, 0x70, 0x49, 0x67, 0xe6,
	0xeb, 0xa5, 0x17, 0x15, 0x24, 0xeb, 0xa5, 0x56, 0x24, 0xe6, 0x16, 0xe4, 0xa4, 0x16, 0x23, 0xa9,
	0x75, 0xe2, 0x07, 0x2b, 0x0e, 0x07, 0xb1, 0x03, 0x40, 0x5e, 0x0a, 0x60, 0x4c, 0x62, 0x03, 0xfb,
	0xcd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x0f, 0xb7, 0xcd, 0xf2, 0xef, 0x00, 0x00, 0x00,
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6

// GreeterClient is the client API for Greeter service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GreeterClient interface {
	// Sends a greeting
	SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

type greeterClient struct {
	cc grpc.ClientConnInterface
}

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
	return &greeterClient{cc}
}

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
	out := new(HelloReply)
	err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

// GreeterServer is the server API for Greeter service.
type GreeterServer interface {
	// Sends a greeting
	SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

// UnimplementedGreeterServer can be embedded to have forward compatible implementations.
type UnimplementedGreeterServer struct {
}

func (*UnimplementedGreeterServer) SayHello(ctx context.Context, req *HelloRequest) (*HelloReply, error) {
	return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented")
}

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
	s.RegisterService(&_Greeter_serviceDesc, srv)
}

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(HelloRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(GreeterServer).SayHello(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/helloworld.Greeter/SayHello",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
	}
	return interceptor(ctx, in, info, handler)
}

var _Greeter_serviceDesc = grpc.ServiceDesc{
	ServiceName: "helloworld.Greeter",
	HandlerType: (*GreeterServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "SayHello",
			Handler:    _Greeter_SayHello_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "helloworld.proto",
}

この定義をもとにコードを自動生成(上記右)して、自動生成されたコード(パラメータなど)を使って実装することで、定義した構造を用いて通信できるというわけです。

このコードを自動生成するのが protoc コマンドです。

Macであれば brew を使ってインストールすることもできますが、 それ以外の環境を利用する場合は Releases からダウンロードしてきましょう

今回は 簡単なスクリプト を使ってインストールします。これで protoc コマンドが使えるようになります。

とりあえず使い方とオプションはこんな感じ
root@2b588693c971:~# ./protoc_install.sh
root@2b588693c971:~# protoc --version
libprotoc 3.11.2

# protoファイルが置いてあるディレクトリに移動
root@2b588693c971:~# cd helloworld/helloworld
root@2b588693c971:~/helloworld/helloworld# protoc --go_out=plugins=grpc:. helloworld.proto

# 指定したディレクトリに `ファイル名.pb.go` が作られる
root@2b588693c971:~/helloworld/helloworld# ls
helloworld.pb.go  helloworld.proto

# オプション
root@2b588693c971:~/helloworld/helloworld# protoc -help
Usage: protoc [OPTION] PROTO_FILES
Parse PROTO_FILES and generate output based on the options given:
  -IPATH, --proto_path=PATH   Specify the directory in which to search for
                              imports.  May be specified multiple times;
                              directories will be searched in order.  If not
                              given, the current working directory is used.
                              If not found in any of the these directories,
                              the --descriptor_set_in descriptors will be
                              checked for required proto file.
  --version                   Show version info and exit.
  -h, --help                  Show this text and exit.
  --encode=MESSAGE_TYPE       Read a text-format message of the given type
                              from standard input and write it in binary
                              to standard output.  The message type must
                              be defined in PROTO_FILES or their imports.
  --decode=MESSAGE_TYPE       Read a binary message of the given type from
                              standard input and write it in text format
                              to standard output.  The message type must
                              be defined in PROTO_FILES or their imports.
  --decode_raw                Read an arbitrary protocol message from
                              standard input and write the raw tag/value
                              pairs in text format to standard output.  No
                              PROTO_FILES should be given when using this
                              flag.
  --descriptor_set_in=FILES   Specifies a delimited list of FILES
                              each containing a FileDescriptorSet (a
                              protocol buffer defined in descriptor.proto).
                              The FileDescriptor for each of the PROTO_FILES
                              provided will be loaded from these
                              FileDescriptorSets. If a FileDescriptor
                              appears multiple times, the first occurrence
                              will be used.
  -oFILE,                     Writes a FileDescriptorSet (a protocol buffer,
    --descriptor_set_out=FILE defined in descriptor.proto) containing all of
                              the input files to FILE.
  --include_imports           When using --descriptor_set_out, also include
                              all dependencies of the input files in the
                              set, so that the set is self-contained.
  --include_source_info       When using --descriptor_set_out, do not strip
                              SourceCodeInfo from the FileDescriptorProto.
                              This results in vastly larger descriptors that
                              include information about the original
                              location of each decl in the source file as
                              well as surrounding comments.
  --dependency_out=FILE       Write a dependency output file in the format
                              expected by make. This writes the transitive
                              set of input file paths to FILE
  --error_format=FORMAT       Set the format in which to print errors.
                              FORMAT may be 'gcc' (the default) or 'msvs'
                              (Microsoft Visual Studio format).
  --print_free_field_numbers  Print the free field numbers of the messages
                              defined in the given proto files. Groups share
                              the same field number space with the parent
                              message. Extension ranges are counted as
                              occupied fields numbers.

  --plugin=EXECUTABLE         Specifies a plugin executable to use.
                              Normally, protoc searches the PATH for
                              plugins, but you may specify additional
                              executables not in the path using this flag.
                              Additionally, EXECUTABLE may be of the form
                              NAME=PATH, in which case the given plugin name
                              is mapped to the given executable even if
                              the executable's own name differs.
  --cpp_out=OUT_DIR           Generate C++ header and source.
  --csharp_out=OUT_DIR        Generate C# source file.
  --java_out=OUT_DIR          Generate Java source file.
  --js_out=OUT_DIR            Generate JavaScript source.
  --objc_out=OUT_DIR          Generate Objective C header and source.
  --php_out=OUT_DIR           Generate PHP source file.
  --python_out=OUT_DIR        Generate Python source file.
  --ruby_out=OUT_DIR          Generate Ruby source file.
  @<filename>                 Read options and filenames from file. If a
                              relative file path is specified, the file
                              will be searched in the working directory.
                              The --proto_path option will not affect how
                              this argument file is searched. Content of
                              the file will be expanded in the position of
                              @<filename> as in the argument list. Note
                              that shell expansion is not applied to the
                              content of the file (i.e., you cannot use
                              quotes, wildcards, escapes, commands, etc.).
                              Each line corresponds to a single argument,
                              even if it contains spaces.

警告

このスクリプトの中では Goのコードを生成するための protoc-gen-go も同時にインストールしています

これがない状態でコードを生成しようとすると次のようなエラーになるので手動でインストールする場合は注意してください。

protoc-gen-go: program not found or is not executable

syntax

Protocol Buffer のバージョン

現時点(2020-02)では 3 が最新。

option

protoc に与えるオプション

現状 Golang 向けのオプションは go_package だけっぽいです。

備考

option go_package = "../ptypes"; のように指定して以下のように実行すれば ../ptypes/ に pb ファイルが出力されます。

~/helloworld/helloworld# protoc --go_out=plugins=grpc:. helloworld.proto
~/helloworld/helloworld# ls ../ptypes/
helloworld.pb.go

ちなみに --go_out=plugins=grpc:{パス} を起点に出力されるため go_package にフルパスを指定するなら、パスには / を指定します。

package

パッケージ名

(Golangでは)Goファイルのパッケージ名になるため出力先のディレクトリ名と合わせるべき

import

別ファイルの定義を取り込む。

message

通信に利用されるパラメータやレスポンスの型定義

service

機能をまとめた単位。サービスには RPC が紐づく。

  • (Golangでは)定義したServiceの数だけ Server, Client のインタフェースが作られる

    • Serviceの rpc フィールドはそれぞれのインタフェースのメソッドとして登録される

参考

⭐️ RPC types

gRPCは4種類の通信方法を提供します。

というと身構えてしまいそうですが、 簡単に言うと「 stream を指定するかしないか」 「クライアント側で指定するかサーバ側で指定するか」で名称が分かれているだけです。

種別

サーバサイド

クライアントサイド

Unary RPC

blockdiag clientserverRequestResponse

Server streaming RPC

stream

blockdiag clientserverRequestResponseResponse

Client streaming RPC

stream

blockdiag clientserverRequestRequestResponse

Bidirectional streaming

stream

stream

blockdiag clientserverRequestResponseResponseRequestRequestRequest

ではストリーミングとは何かというと、 連続した通信(リクエスト,レスポンス)で、これまでの単発の通信と対比する呼称です。

gRPCはリクエスト、レスポンスともにデータサイズに上限があります。やり取りするデータが可変長で巨大になる可能性がある場合はストリーミングによって通信を分割することが望ましいです。

🌚 Unary RPC

1つのリクエストに対して1つのレスポンスを返す最も基本的なRPCです。

Unary RPC - gRPC

クライアント、サーバともに stream を指定しなければデフォルトでこれになります。

先程も見ましたが、定義ファイルをもう一度確認しておきましょう。

grpc-go-example/helloworld/helloworld/helloworld.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

grpc-go-example/helloworld/helloworld/helloworld.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: helloworld.proto

package helloworld

import (
	context "context"
	fmt "fmt"
	proto "github.com/golang/protobuf/proto"
	grpc "google.golang.org/grpc"
	codes "google.golang.org/grpc/codes"
	status "google.golang.org/grpc/status"
	math "math"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

// The request message containing the user's name.
type HelloRequest struct {
	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *HelloRequest) Reset()         { *m = HelloRequest{} }
func (m *HelloRequest) String() string { return proto.CompactTextString(m) }
func (*HelloRequest) ProtoMessage()    {}
func (*HelloRequest) Descriptor() ([]byte, []int) {
	return fileDescriptor_17b8c58d586b62f2, []int{0}
}

func (m *HelloRequest) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_HelloRequest.Unmarshal(m, b)
}
func (m *HelloRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_HelloRequest.Marshal(b, m, deterministic)
}
func (m *HelloRequest) XXX_Merge(src proto.Message) {
	xxx_messageInfo_HelloRequest.Merge(m, src)
}
func (m *HelloRequest) XXX_Size() int {
	return xxx_messageInfo_HelloRequest.Size(m)
}
func (m *HelloRequest) XXX_DiscardUnknown() {
	xxx_messageInfo_HelloRequest.DiscardUnknown(m)
}

var xxx_messageInfo_HelloRequest proto.InternalMessageInfo

func (m *HelloRequest) GetName() string {
	if m != nil {
		return m.Name
	}
	return ""
}

// The response message containing the greetings
type HelloReply struct {
	Message              string   `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *HelloReply) Reset()         { *m = HelloReply{} }
func (m *HelloReply) String() string { return proto.CompactTextString(m) }
func (*HelloReply) ProtoMessage()    {}
func (*HelloReply) Descriptor() ([]byte, []int) {
	return fileDescriptor_17b8c58d586b62f2, []int{1}
}

func (m *HelloReply) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_HelloReply.Unmarshal(m, b)
}
func (m *HelloReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_HelloReply.Marshal(b, m, deterministic)
}
func (m *HelloReply) XXX_Merge(src proto.Message) {
	xxx_messageInfo_HelloReply.Merge(m, src)
}
func (m *HelloReply) XXX_Size() int {
	return xxx_messageInfo_HelloReply.Size(m)
}
func (m *HelloReply) XXX_DiscardUnknown() {
	xxx_messageInfo_HelloReply.DiscardUnknown(m)
}

var xxx_messageInfo_HelloReply proto.InternalMessageInfo

func (m *HelloReply) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

func init() {
	proto.RegisterType((*HelloRequest)(nil), "helloworld.HelloRequest")
	proto.RegisterType((*HelloReply)(nil), "helloworld.HelloReply")
}

func init() { proto.RegisterFile("helloworld.proto", fileDescriptor_17b8c58d586b62f2) }

var fileDescriptor_17b8c58d586b62f2 = []byte{
	// 175 bytes of a gzipped FileDescriptorProto
	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0xcd, 0xc9,
	0xc9, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88,
	0x28, 0x29, 0x71, 0xf1, 0x78, 0x80, 0x78, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x42,
	0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0xb6, 0x92,
	0x1a, 0x17, 0x17, 0x54, 0x4d, 0x41, 0x4e, 0xa5, 0x90, 0x04, 0x17, 0x7b, 0x6e, 0x6a, 0x71, 0x71,
	0x62, 0x3a, 0x4c, 0x11, 0x8c, 0x6b, 0xe4, 0xc9, 0xc5, 0xee, 0x5e, 0x94, 0x9a, 0x5a, 0x92, 0x5a,
	0x24, 0x64, 0xc7, 0xc5, 0x11, 0x9c, 0x58, 0x09, 0xd6, 0x25, 0x24, 0xa1, 0x87, 0xe4, 0x02, 0x64,
	0xcb, 0xa4, 0xc4, 0xb0, 0xc8, 0x14, 0xe4, 0x54, 0x2a, 0x31, 0x38, 0x19, 0x70, 0x49, 0x67, 0xe6,
	0xeb, 0xa5, 0x17, 0x15, 0x24, 0xeb, 0xa5, 0x56, 0x24, 0xe6, 0x16, 0xe4, 0xa4, 0x16, 0x23, 0xa9,
	0x75, 0xe2, 0x07, 0x2b, 0x0e, 0x07, 0xb1, 0x03, 0x40, 0x5e, 0x0a, 0x60, 0x4c, 0x62, 0x03, 0xfb,
	0xcd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x0f, 0xb7, 0xcd, 0xf2, 0xef, 0x00, 0x00, 0x00,
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6

// GreeterClient is the client API for Greeter service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GreeterClient interface {
	// Sends a greeting
	SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

type greeterClient struct {
	cc grpc.ClientConnInterface
}

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
	return &greeterClient{cc}
}

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
	out := new(HelloReply)
	err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

// GreeterServer is the server API for Greeter service.
type GreeterServer interface {
	// Sends a greeting
	SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

// UnimplementedGreeterServer can be embedded to have forward compatible implementations.
type UnimplementedGreeterServer struct {
}

func (*UnimplementedGreeterServer) SayHello(ctx context.Context, req *HelloRequest) (*HelloReply, error) {
	return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented")
}

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
	s.RegisterService(&_Greeter_serviceDesc, srv)
}

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(HelloRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(GreeterServer).SayHello(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/helloworld.Greeter/SayHello",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
	}
	return interceptor(ctx, in, info, handler)
}

var _Greeter_serviceDesc = grpc.ServiceDesc{
	ServiceName: "helloworld.Greeter",
	HandlerType: (*GreeterServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "SayHello",
			Handler:    _Greeter_SayHello_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "helloworld.proto",
}

これらを使ってサーバとクライアントのコードを書いていきます。

grpc-go-example/helloworld/greeter_server/main.go
package main

import (
	"context"
	"log"
	"net"

	pb "github.com/righ/grpc-go-example/helloworld/helloworld"
	"google.golang.org/grpc"
)

const (
	port = ":50051"
)

// server is used to implement helloworld.GreeterServer.
type server struct {
	pb.UnimplementedGreeterServer
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	log.Printf("Received: %v", in.GetName())
	return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

grpc-go-example/helloworld/greeter_client/main.go
package main

import (
	"context"
	"log"
	"os"
	"time"

	pb "github.com/righ/grpc-go-example/helloworld/helloworld"
	"google.golang.org/grpc"
)

const (
	address     = "grpc-server:50051"
	defaultName = "world"
)

func main() {
	// Set up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	// Contact the server and print out its response.
	name := defaultName
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

これは以降全てに共通することですが、サーバ側のコードでは pb.Register{Service名}Server 関数を使って「grpcのサーバ」と「サービス」を紐付けるわけですが、サービスには上述したインタフェースを引数に取るため、構造体には rpc に相当するメソッドが定義されている必要があります。

Unary RPC では サーバ側のRPCメソッド はリクエストのパラメータとして構造体を受け取って レスポンスとして構造体を返却するだけです。

Unary RPC のクライアント側コードは更にシンプルで pb.New{Service名}Client で作成したクライアントから呼び出したい rpc のメソッドを呼び出すだけです。

サーバと違い、レスポンスを返却値として返すだけなのでメソッドを実装する必要はありません。

これは Unary RPC に限った話ではないですが、 サーバ側のインタフェースを満たす空の構造体(Unimplemented{サービス名}Server)が自動的に生成されるので、 それを使えばひとまずサーバとして動くようになります。

今回のexampleのように、これ(UnimplementedServer)を埋め込んだ構造体を自分で定義して、 必要なメソッドを追加していくというのも良さそうです。

type server struct {
      pb.UnimplementedGreeterServer
}

実装すべき機能が多い場合は便利ですね。

警告

Unary RPC は最も基本的な gRPC ではありますが、後述する RPC と引数が異なるので注意してください。

利用例は最初のセクションを参照してください。

🌜 Server streaming RPC

1つのリクエストに対して、複数(N)のレスポンスを返します。

Server streaming RPC - gRPC

まずはproto定義と自動生成ファイル。 レスポンスにだけ stream が指定されています。

grpc-go-example/goodnightworld/goodnightworld/goodnightworld.proto
syntax = "proto3";

package goodnightworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayGoodnight (GoodnightRequest) returns (stream GoodnightReply) {}
}

// The request message containing the user's name.
message GoodnightRequest {
  string name = 1;
}

// The response message containing the greetings
message GoodnightReply {
  string message = 1;
}

grpc-go-example/goodnightworld/goodnightworld/goodnightworld.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: goodnightworld.proto

package goodnightworld

import (
	context "context"
	fmt "fmt"
	proto "github.com/golang/protobuf/proto"
	grpc "google.golang.org/grpc"
	codes "google.golang.org/grpc/codes"
	status "google.golang.org/grpc/status"
	math "math"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

// The request message containing the user's name.
type GoodnightRequest struct {
	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *GoodnightRequest) Reset()         { *m = GoodnightRequest{} }
func (m *GoodnightRequest) String() string { return proto.CompactTextString(m) }
func (*GoodnightRequest) ProtoMessage()    {}
func (*GoodnightRequest) Descriptor() ([]byte, []int) {
	return fileDescriptor_a27943471a8d8b0a, []int{0}
}

func (m *GoodnightRequest) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_GoodnightRequest.Unmarshal(m, b)
}
func (m *GoodnightRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_GoodnightRequest.Marshal(b, m, deterministic)
}
func (m *GoodnightRequest) XXX_Merge(src proto.Message) {
	xxx_messageInfo_GoodnightRequest.Merge(m, src)
}
func (m *GoodnightRequest) XXX_Size() int {
	return xxx_messageInfo_GoodnightRequest.Size(m)
}
func (m *GoodnightRequest) XXX_DiscardUnknown() {
	xxx_messageInfo_GoodnightRequest.DiscardUnknown(m)
}

var xxx_messageInfo_GoodnightRequest proto.InternalMessageInfo

func (m *GoodnightRequest) GetName() string {
	if m != nil {
		return m.Name
	}
	return ""
}

// The response message containing the greetings
type GoodnightReply struct {
	Message              string   `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *GoodnightReply) Reset()         { *m = GoodnightReply{} }
func (m *GoodnightReply) String() string { return proto.CompactTextString(m) }
func (*GoodnightReply) ProtoMessage()    {}
func (*GoodnightReply) Descriptor() ([]byte, []int) {
	return fileDescriptor_a27943471a8d8b0a, []int{1}
}

func (m *GoodnightReply) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_GoodnightReply.Unmarshal(m, b)
}
func (m *GoodnightReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_GoodnightReply.Marshal(b, m, deterministic)
}
func (m *GoodnightReply) XXX_Merge(src proto.Message) {
	xxx_messageInfo_GoodnightReply.Merge(m, src)
}
func (m *GoodnightReply) XXX_Size() int {
	return xxx_messageInfo_GoodnightReply.Size(m)
}
func (m *GoodnightReply) XXX_DiscardUnknown() {
	xxx_messageInfo_GoodnightReply.DiscardUnknown(m)
}

var xxx_messageInfo_GoodnightReply proto.InternalMessageInfo

func (m *GoodnightReply) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

func init() {
	proto.RegisterType((*GoodnightRequest)(nil), "goodnightworld.GoodnightRequest")
	proto.RegisterType((*GoodnightReply)(nil), "goodnightworld.GoodnightReply")
}

func init() { proto.RegisterFile("goodnightworld.proto", fileDescriptor_a27943471a8d8b0a) }

var fileDescriptor_a27943471a8d8b0a = []byte{
	// 148 bytes of a gzipped FileDescriptorProto
	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x49, 0xcf, 0xcf, 0x4f,
	0xc9, 0xcb, 0x4c, 0xcf, 0x28, 0x29, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9,
	0x17, 0xe2, 0x43, 0x15, 0x55, 0x52, 0xe3, 0x12, 0x70, 0x87, 0x89, 0x04, 0xa5, 0x16, 0x96, 0xa6,
	0x16, 0x97, 0x08, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70,
	0x06, 0x81, 0xd9, 0x4a, 0x5a, 0x5c, 0x7c, 0x48, 0xea, 0x0a, 0x72, 0x2a, 0x85, 0x24, 0xb8, 0xd8,
	0x73, 0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x61, 0x0a, 0x61, 0x5c, 0xa3, 0x78, 0x2e, 0x76, 0xf7, 0xa2,
	0xd4, 0xd4, 0x92, 0xd4, 0x22, 0xa1, 0x10, 0x2e, 0x9e, 0xe0, 0xc4, 0x4a, 0xb8, 0x4e, 0x21, 0x05,
	0x3d, 0x34, 0x57, 0xa1, 0x5b, 0x2e, 0x25, 0x87, 0x47, 0x45, 0x41, 0x4e, 0xa5, 0x12, 0x83, 0x01,
	0x63, 0x12, 0x1b, 0xd8, 0x2f, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0xae, 0x2b, 0xc8,
	0xe3, 0x00, 0x00, 0x00,
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6

// GreeterClient is the client API for Greeter service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GreeterClient interface {
	// Sends a greeting
	SayGoodnight(ctx context.Context, in *GoodnightRequest, opts ...grpc.CallOption) (Greeter_SayGoodnightClient, error)
}

type greeterClient struct {
	cc grpc.ClientConnInterface
}

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
	return &greeterClient{cc}
}

func (c *greeterClient) SayGoodnight(ctx context.Context, in *GoodnightRequest, opts ...grpc.CallOption) (Greeter_SayGoodnightClient, error) {
	stream, err := c.cc.NewStream(ctx, &_Greeter_serviceDesc.Streams[0], "/goodnightworld.Greeter/SayGoodnight", opts...)
	if err != nil {
		return nil, err
	}
	x := &greeterSayGoodnightClient{stream}
	if err := x.ClientStream.SendMsg(in); err != nil {
		return nil, err
	}
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	return x, nil
}

type Greeter_SayGoodnightClient interface {
	Recv() (*GoodnightReply, error)
	grpc.ClientStream
}

type greeterSayGoodnightClient struct {
	grpc.ClientStream
}

func (x *greeterSayGoodnightClient) Recv() (*GoodnightReply, error) {
	m := new(GoodnightReply)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

// GreeterServer is the server API for Greeter service.
type GreeterServer interface {
	// Sends a greeting
	SayGoodnight(*GoodnightRequest, Greeter_SayGoodnightServer) error
}

// UnimplementedGreeterServer can be embedded to have forward compatible implementations.
type UnimplementedGreeterServer struct {
}

func (*UnimplementedGreeterServer) SayGoodnight(req *GoodnightRequest, srv Greeter_SayGoodnightServer) error {
	return status.Errorf(codes.Unimplemented, "method SayGoodnight not implemented")
}

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
	s.RegisterService(&_Greeter_serviceDesc, srv)
}

func _Greeter_SayGoodnight_Handler(srv interface{}, stream grpc.ServerStream) error {
	m := new(GoodnightRequest)
	if err := stream.RecvMsg(m); err != nil {
		return err
	}
	return srv.(GreeterServer).SayGoodnight(m, &greeterSayGoodnightServer{stream})
}

type Greeter_SayGoodnightServer interface {
	Send(*GoodnightReply) error
	grpc.ServerStream
}

type greeterSayGoodnightServer struct {
	grpc.ServerStream
}

func (x *greeterSayGoodnightServer) Send(m *GoodnightReply) error {
	return x.ServerStream.SendMsg(m)
}

var _Greeter_serviceDesc = grpc.ServiceDesc{
	ServiceName: "goodnightworld.Greeter",
	HandlerType: (*GreeterServer)(nil),
	Methods:     []grpc.MethodDesc{},
	Streams: []grpc.StreamDesc{
		{
			StreamName:    "SayGoodnight",
			Handler:       _Greeter_SayGoodnight_Handler,
			ServerStreams: true,
		},
	},
	Metadata: "goodnightworld.proto",
}

今回プログラムの概要は以下です。

  • クライアントは名前をサーバに一度だけ送信する

  • サーバは受け取った名前を使って挨拶を組み立てて3回連続で返却する

    • デバッグで標準出力にも表示する

  • クライアントはサーバから受け取ったメッセージを標準出力に表示する

実用性は全くありません。

grpc-go-example/goodnightworld/greeter_server/main.go
package main

import (
	"log"
	"net"

	pb "github.com/righ/grpc-go-example/goodnightworld/goodnightworld"
	"google.golang.org/grpc"
)

const (
	port = ":50051"
)

// server is used to implement goodnightworld.GreeterServer.
type server struct {
	pb.UnimplementedGreeterServer
}

// SayGoodnight implements goodnightworld.GreeterServer
func (s *server) SayGoodnight(in *pb.GoodnightRequest, srv pb.Greeter_SayGoodnightServer) error {
	log.Printf("Received: %s", in.GetName())
	srv.Send(&pb.GoodnightReply{Message: "Good night " + in.GetName() + "!"})
	srv.Send(&pb.GoodnightReply{Message: "Good night " + in.GetName() + "!"})
	srv.Send(&pb.GoodnightReply{Message: "Good night " + in.GetName() + "!"})
	return nil
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

grpc-go-example/goodnightworld/greeter_client/main.go
package main

import (
	"context"
	"io"
	"log"
	"time"

	pb "github.com/righ/grpc-go-example/goodnightworld/goodnightworld"
	"google.golang.org/grpc"
)

const (
	address = "grpc-server:50051"
)

func main() {
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	cli, err := c.SayGoodnight(ctx, &pb.GoodnightRequest{Name: "righ"})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
		return
	}
	for {
		reply, err := cli.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("could not greet: %v", err)
		}
		log.Println(reply.Message)
	}
}

Server streaming RPC の サーバ側のRPCメソッド は Unary RPC とは少し違い、リクエストパラメータは第1引数で受け取ります。レスポンスは第2引数のServer構造体(以下 srv という)の srv.Send メソッドを使って複数返却できます。

ctx はどこいったの?となりますが srv.Context に入っています。

Server streaming PRC の クライアント側のRPCメソッドも Unary RPC とは違います。具体的にはパラメータは引数として受け取らず、メソッドの返却値としてレスポンスではなくRPCのクライアントが返却されます。(以下 cli という)

cli.Send メソッドを使いリクエストを送信し(パラメータはここで指定する)、 cli.Recv メソッドを使って res, err := cli.Recv() のようにレスポンスを抽出します。 レスポンスは複数受け取ることになるためループの中で受け取ることになりますが、何らかの形で終了を検知しなければなりません。

これは errio.EOF と一致するかどうかで判定できるので検知したらループを抜けます。

~/goodnightworld/greeter_server# go run .

~/goodnightworld/greeter_client# go run .

2020/02/01 08:13:23 Received: name:"righ"
2020/02/01 08:13:23 Good night righ!
2020/02/01 08:13:23 Good night righ!
2020/02/01 08:13:23 Good night righ!
blockdiag clientserverrighGood night righ!Good night righ!Good night righ!

🌛 Client streaming RPC

複数(M)のリクエストに対して、1つのレスポンスを返します。

Client streaming RPC - gRPC

まずはproto定義と自動生成ファイル。 リクエストにだけ stream が指定されています。

grpc-go-example/goodmorningworld/goodmorningworld/goodmorningworld.proto
syntax = "proto3";

package goodmorningworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayGoodmorning (stream GoodmorningRequest) returns (GoodmorningReply) {}
}

// The request message containing the user's name.
message GoodmorningRequest {
  string name = 1;
}

// The response message containing the greetings
message GoodmorningReply {
  string message = 1;
}

grpc-go-example/goodmorningworld/goodmorningworld/goodmorningworld.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: goodmorningworld.proto

package goodmorningworld

import (
	context "context"
	fmt "fmt"
	proto "github.com/golang/protobuf/proto"
	grpc "google.golang.org/grpc"
	codes "google.golang.org/grpc/codes"
	status "google.golang.org/grpc/status"
	math "math"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

// The request message containing the user's name.
type GoodmorningRequest struct {
	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *GoodmorningRequest) Reset()         { *m = GoodmorningRequest{} }
func (m *GoodmorningRequest) String() string { return proto.CompactTextString(m) }
func (*GoodmorningRequest) ProtoMessage()    {}
func (*GoodmorningRequest) Descriptor() ([]byte, []int) {
	return fileDescriptor_d4e0dee62205cbf6, []int{0}
}

func (m *GoodmorningRequest) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_GoodmorningRequest.Unmarshal(m, b)
}
func (m *GoodmorningRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_GoodmorningRequest.Marshal(b, m, deterministic)
}
func (m *GoodmorningRequest) XXX_Merge(src proto.Message) {
	xxx_messageInfo_GoodmorningRequest.Merge(m, src)
}
func (m *GoodmorningRequest) XXX_Size() int {
	return xxx_messageInfo_GoodmorningRequest.Size(m)
}
func (m *GoodmorningRequest) XXX_DiscardUnknown() {
	xxx_messageInfo_GoodmorningRequest.DiscardUnknown(m)
}

var xxx_messageInfo_GoodmorningRequest proto.InternalMessageInfo

func (m *GoodmorningRequest) GetName() string {
	if m != nil {
		return m.Name
	}
	return ""
}

// The response message containing the greetings
type GoodmorningReply struct {
	Message              string   `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *GoodmorningReply) Reset()         { *m = GoodmorningReply{} }
func (m *GoodmorningReply) String() string { return proto.CompactTextString(m) }
func (*GoodmorningReply) ProtoMessage()    {}
func (*GoodmorningReply) Descriptor() ([]byte, []int) {
	return fileDescriptor_d4e0dee62205cbf6, []int{1}
}

func (m *GoodmorningReply) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_GoodmorningReply.Unmarshal(m, b)
}
func (m *GoodmorningReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_GoodmorningReply.Marshal(b, m, deterministic)
}
func (m *GoodmorningReply) XXX_Merge(src proto.Message) {
	xxx_messageInfo_GoodmorningReply.Merge(m, src)
}
func (m *GoodmorningReply) XXX_Size() int {
	return xxx_messageInfo_GoodmorningReply.Size(m)
}
func (m *GoodmorningReply) XXX_DiscardUnknown() {
	xxx_messageInfo_GoodmorningReply.DiscardUnknown(m)
}

var xxx_messageInfo_GoodmorningReply proto.InternalMessageInfo

func (m *GoodmorningReply) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

func init() {
	proto.RegisterType((*GoodmorningRequest)(nil), "goodmorningworld.GoodmorningRequest")
	proto.RegisterType((*GoodmorningReply)(nil), "goodmorningworld.GoodmorningReply")
}

func init() { proto.RegisterFile("goodmorningworld.proto", fileDescriptor_d4e0dee62205cbf6) }

var fileDescriptor_d4e0dee62205cbf6 = []byte{
	// 150 bytes of a gzipped FileDescriptorProto
	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4b, 0xcf, 0xcf, 0x4f,
	0xc9, 0xcd, 0x2f, 0xca, 0xcb, 0xcc, 0x4b, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca,
	0x2f, 0xc9, 0x17, 0x12, 0x40, 0x17, 0x57, 0xd2, 0xe0, 0x12, 0x72, 0x47, 0x88, 0x05, 0xa5, 0x16,
	0x96, 0xa6, 0x16, 0x97, 0x08, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30,
	0x6a, 0x70, 0x06, 0x81, 0xd9, 0x4a, 0x3a, 0x5c, 0x02, 0x28, 0x2a, 0x0b, 0x72, 0x2a, 0x85, 0x24,
	0xb8, 0xd8, 0x73, 0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x61, 0x4a, 0x61, 0x5c, 0xa3, 0x4c, 0x2e, 0x76,
	0xf7, 0xa2, 0xd4, 0xd4, 0x92, 0xd4, 0x22, 0xa1, 0x38, 0x2e, 0xbe, 0xe0, 0xc4, 0x4a, 0x24, 0xbd,
	0x42, 0x2a, 0x7a, 0x18, 0xee, 0xc3, 0x74, 0x84, 0x94, 0x12, 0x01, 0x55, 0x05, 0x39, 0x95, 0x4a,
	0x0c, 0x1a, 0x8c, 0x49, 0x6c, 0x60, 0xbf, 0x19, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x89, 0x33,
	0x30, 0x81, 0xf5, 0x00, 0x00, 0x00,
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6

// GreeterClient is the client API for Greeter service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GreeterClient interface {
	// Sends a greeting
	SayGoodmorning(ctx context.Context, opts ...grpc.CallOption) (Greeter_SayGoodmorningClient, error)
}

type greeterClient struct {
	cc grpc.ClientConnInterface
}

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
	return &greeterClient{cc}
}

func (c *greeterClient) SayGoodmorning(ctx context.Context, opts ...grpc.CallOption) (Greeter_SayGoodmorningClient, error) {
	stream, err := c.cc.NewStream(ctx, &_Greeter_serviceDesc.Streams[0], "/goodmorningworld.Greeter/SayGoodmorning", opts...)
	if err != nil {
		return nil, err
	}
	x := &greeterSayGoodmorningClient{stream}
	return x, nil
}

type Greeter_SayGoodmorningClient interface {
	Send(*GoodmorningRequest) error
	CloseAndRecv() (*GoodmorningReply, error)
	grpc.ClientStream
}

type greeterSayGoodmorningClient struct {
	grpc.ClientStream
}

func (x *greeterSayGoodmorningClient) Send(m *GoodmorningRequest) error {
	return x.ClientStream.SendMsg(m)
}

func (x *greeterSayGoodmorningClient) CloseAndRecv() (*GoodmorningReply, error) {
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	m := new(GoodmorningReply)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

// GreeterServer is the server API for Greeter service.
type GreeterServer interface {
	// Sends a greeting
	SayGoodmorning(Greeter_SayGoodmorningServer) error
}

// UnimplementedGreeterServer can be embedded to have forward compatible implementations.
type UnimplementedGreeterServer struct {
}

func (*UnimplementedGreeterServer) SayGoodmorning(srv Greeter_SayGoodmorningServer) error {
	return status.Errorf(codes.Unimplemented, "method SayGoodmorning not implemented")
}

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
	s.RegisterService(&_Greeter_serviceDesc, srv)
}

func _Greeter_SayGoodmorning_Handler(srv interface{}, stream grpc.ServerStream) error {
	return srv.(GreeterServer).SayGoodmorning(&greeterSayGoodmorningServer{stream})
}

type Greeter_SayGoodmorningServer interface {
	SendAndClose(*GoodmorningReply) error
	Recv() (*GoodmorningRequest, error)
	grpc.ServerStream
}

type greeterSayGoodmorningServer struct {
	grpc.ServerStream
}

func (x *greeterSayGoodmorningServer) SendAndClose(m *GoodmorningReply) error {
	return x.ServerStream.SendMsg(m)
}

func (x *greeterSayGoodmorningServer) Recv() (*GoodmorningRequest, error) {
	m := new(GoodmorningRequest)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

var _Greeter_serviceDesc = grpc.ServiceDesc{
	ServiceName: "goodmorningworld.Greeter",
	HandlerType: (*GreeterServer)(nil),
	Methods:     []grpc.MethodDesc{},
	Streams: []grpc.StreamDesc{
		{
			StreamName:    "SayGoodmorning",
			Handler:       _Greeter_SayGoodmorning_Handler,
			ClientStreams: true,
		},
	},
	Metadata: "goodmorningworld.proto",
}

今回作成するプログラムの概要は以下です。

  • クライアントは標準入力から名前を受け取り、一つずつサーバに送信する

    • 空行が入力されるとクライアント側の入力は完了とする

  • サーバはクライアントから受け取った名前を使い挨拶を組み立ててクライアントに返却する

    • 受け取った名前をデバッグ用に標準出力に表示する

  • クライアントはサーバから受け取った挨拶を標準出力に表示する

実用性はない。

grpc-go-example/goodmorningworld/greeter_server/main.go
package main

import (
	"io"
	"log"
	"net"
	"strings"

	pb "github.com/righ/grpc-go-example/goodmorningworld/goodmorningworld"
	"google.golang.org/grpc"
)

const (
	port = ":50051"
)

// server is used to implement goodmorningworld.GreeterServer.
type server struct {
	pb.UnimplementedGreeterServer
}

// SayGoodmorning implements goodmorningworld.GreeterServer
func (s *server) SayGoodmorning(srv pb.Greeter_SayGoodmorningServer) error {
	names := []string{}
	for {
		req, err := srv.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		log.Printf("Received: %s", req)
		names = append(names, req.GetName())
	}
	message := strings.Join(names[:], ",")
	srv.SendAndClose(&pb.GoodmorningReply{Message: "Good morning " + message + "!"})
	return nil
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

grpc-go-example/goodmorningworld/greeter_client/main.go
package main

import (
	"bufio"
	"context"
	"fmt"
	"log"
	"os"
	"time"

	pb "github.com/righ/grpc-go-example/goodmorningworld/goodmorningworld"
	"google.golang.org/grpc"
)

const (
	address = "grpc-server:50051"
)

func main() {
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	defer cancel()

	cli, err := c.SayGoodmorning(ctx)
	if err != nil {
		log.Fatalf("could not greet: %v", err)
		return
	}
	scanner := bufio.NewScanner(os.Stdin)
	for scanner.Scan() {
		name := scanner.Text()
		if name == "" {
			break
		}
		if err := cli.Send(&pb.GoodmorningRequest{Name: name}); err != nil {
			log.Fatalf("Send failed: %v", err)
		}
	}
	reply, err := cli.CloseAndRecv()
	if err != nil {
		log.Fatalf("could not greet: %v", err)
		return
	}
	fmt.Println(reply.GetMessage())
}

Client streaming RPC のサーバ側RPCメソッドは Server構造体(以下 srv という)を引数として、 srv.Recv メソッドを使い req, err := srv.Recv() のようにリクエストを抽出するように実装します。

リクエストは複数受け取ることになるため、先程のクライアントサイドと同様に errio.EOF のときにループを抜けるようにします。

レスポンスは srv.SendAndClose を使って返却します。

Client streaming PRC の クライアント側RPCメソッドはRPCのクライアントが返却されます。(以下 cli という)

cli.Send メソッドを使いリクエストを送信し(パラメータはここで指定する)、 cli.CloseAndRecv メソッドを使って res, err := cli.CloseAndRecv() のようにサーバからのレスポンスを抽出し、通信を終了します。

~/goodmorningworld/greeter_server# go run .

~/goodmorningworld/greeter_client# go run .

2020/02/01 08:50:44 Received: name:"dj"
2020/02/01 08:50:44 Received: name:"steph"
2020/02/01 08:50:44 Received: name:"michelle"
root@98337ba2f5a0:~/goodmorningworld/greeter_client# go run .
dj
steph
michelle

Good morning dj,steph,michelle!
blockdiag clientserverdjstephmichelleGood morning dj,steph,michelle!

🌝 Bidirectional streaming RPC

複数(N)のリクエストに対して、複数(M)のレスポンスを返すことで双方向の通信を実現します。

Bidirectional streaming RPC - gRPC

まずはproto定義と自動生成ファイル。

RPCのリクエスト・レスポンスともに stream を指定しました。

grpc-go-example/chat/chat/chat.proto
syntax = "proto3";

package chat;

service Chat {
  rpc Talk (stream MessageRequest) returns (stream MessageReply) {}
}

message MessageRequest {
  string name = 1;
  string message = 2;
}

message MessageReply {
  string name = 1;
  string message = 2;
}

grpc-go-example/chat/chat/chat.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: chat.proto

package chat

import (
	context "context"
	fmt "fmt"
	proto "github.com/golang/protobuf/proto"
	grpc "google.golang.org/grpc"
	codes "google.golang.org/grpc/codes"
	status "google.golang.org/grpc/status"
	math "math"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

type MessageRequest struct {
	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Message              string   `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *MessageRequest) Reset()         { *m = MessageRequest{} }
func (m *MessageRequest) String() string { return proto.CompactTextString(m) }
func (*MessageRequest) ProtoMessage()    {}
func (*MessageRequest) Descriptor() ([]byte, []int) {
	return fileDescriptor_8c585a45e2093e54, []int{0}
}

func (m *MessageRequest) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_MessageRequest.Unmarshal(m, b)
}
func (m *MessageRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_MessageRequest.Marshal(b, m, deterministic)
}
func (m *MessageRequest) XXX_Merge(src proto.Message) {
	xxx_messageInfo_MessageRequest.Merge(m, src)
}
func (m *MessageRequest) XXX_Size() int {
	return xxx_messageInfo_MessageRequest.Size(m)
}
func (m *MessageRequest) XXX_DiscardUnknown() {
	xxx_messageInfo_MessageRequest.DiscardUnknown(m)
}

var xxx_messageInfo_MessageRequest proto.InternalMessageInfo

func (m *MessageRequest) GetName() string {
	if m != nil {
		return m.Name
	}
	return ""
}

func (m *MessageRequest) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

type MessageReply struct {
	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Message              string   `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *MessageReply) Reset()         { *m = MessageReply{} }
func (m *MessageReply) String() string { return proto.CompactTextString(m) }
func (*MessageReply) ProtoMessage()    {}
func (*MessageReply) Descriptor() ([]byte, []int) {
	return fileDescriptor_8c585a45e2093e54, []int{1}
}

func (m *MessageReply) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_MessageReply.Unmarshal(m, b)
}
func (m *MessageReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_MessageReply.Marshal(b, m, deterministic)
}
func (m *MessageReply) XXX_Merge(src proto.Message) {
	xxx_messageInfo_MessageReply.Merge(m, src)
}
func (m *MessageReply) XXX_Size() int {
	return xxx_messageInfo_MessageReply.Size(m)
}
func (m *MessageReply) XXX_DiscardUnknown() {
	xxx_messageInfo_MessageReply.DiscardUnknown(m)
}

var xxx_messageInfo_MessageReply proto.InternalMessageInfo

func (m *MessageReply) GetName() string {
	if m != nil {
		return m.Name
	}
	return ""
}

func (m *MessageReply) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

func init() {
	proto.RegisterType((*MessageRequest)(nil), "chat.MessageRequest")
	proto.RegisterType((*MessageReply)(nil), "chat.MessageReply")
}

func init() { proto.RegisterFile("chat.proto", fileDescriptor_8c585a45e2093e54) }

var fileDescriptor_8c585a45e2093e54 = []byte{
	// 140 bytes of a gzipped FileDescriptorProto
	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4a, 0xce, 0x48, 0x2c,
	0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x01, 0xb1, 0x95, 0xec, 0xb8, 0xf8, 0x7c, 0x53,
	0x8b, 0x8b, 0x13, 0xd3, 0x53, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x84, 0xb8, 0x58,
	0xf2, 0x12, 0x73, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0xc0, 0x6c, 0x21, 0x09, 0x2e,
	0xf6, 0x5c, 0x88, 0x2a, 0x09, 0x26, 0xb0, 0x30, 0x8c, 0xab, 0x64, 0xc3, 0xc5, 0x03, 0xd7, 0x5f,
	0x90, 0x53, 0x49, 0x9a, 0x6e, 0x23, 0x3b, 0x2e, 0x16, 0xe7, 0x8c, 0xc4, 0x12, 0x21, 0x33, 0x2e,
	0x96, 0x90, 0xc4, 0x9c, 0x6c, 0x21, 0x11, 0x3d, 0xb0, 0x03, 0x51, 0x5d, 0x24, 0x25, 0x84, 0x26,
	0x5a, 0x90, 0x53, 0xa9, 0xc4, 0xa0, 0xc1, 0x68, 0xc0, 0x98, 0xc4, 0x06, 0xf6, 0x8a, 0x31, 0x20,
	0x00, 0x00, 0xff, 0xff, 0x23, 0x97, 0x5d, 0x87, 0xd8, 0x00, 0x00, 0x00,
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6

// ChatClient is the client API for Chat service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type ChatClient interface {
	Talk(ctx context.Context, opts ...grpc.CallOption) (Chat_TalkClient, error)
}

type chatClient struct {
	cc grpc.ClientConnInterface
}

func NewChatClient(cc grpc.ClientConnInterface) ChatClient {
	return &chatClient{cc}
}

func (c *chatClient) Talk(ctx context.Context, opts ...grpc.CallOption) (Chat_TalkClient, error) {
	stream, err := c.cc.NewStream(ctx, &_Chat_serviceDesc.Streams[0], "/chat.Chat/Talk", opts...)
	if err != nil {
		return nil, err
	}
	x := &chatTalkClient{stream}
	return x, nil
}

type Chat_TalkClient interface {
	Send(*MessageRequest) error
	Recv() (*MessageReply, error)
	grpc.ClientStream
}

type chatTalkClient struct {
	grpc.ClientStream
}

func (x *chatTalkClient) Send(m *MessageRequest) error {
	return x.ClientStream.SendMsg(m)
}

func (x *chatTalkClient) Recv() (*MessageReply, error) {
	m := new(MessageReply)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

// ChatServer is the server API for Chat service.
type ChatServer interface {
	Talk(Chat_TalkServer) error
}

// UnimplementedChatServer can be embedded to have forward compatible implementations.
type UnimplementedChatServer struct {
}

func (*UnimplementedChatServer) Talk(srv Chat_TalkServer) error {
	return status.Errorf(codes.Unimplemented, "method Talk not implemented")
}

func RegisterChatServer(s *grpc.Server, srv ChatServer) {
	s.RegisterService(&_Chat_serviceDesc, srv)
}

func _Chat_Talk_Handler(srv interface{}, stream grpc.ServerStream) error {
	return srv.(ChatServer).Talk(&chatTalkServer{stream})
}

type Chat_TalkServer interface {
	Send(*MessageReply) error
	Recv() (*MessageRequest, error)
	grpc.ServerStream
}

type chatTalkServer struct {
	grpc.ServerStream
}

func (x *chatTalkServer) Send(m *MessageReply) error {
	return x.ServerStream.SendMsg(m)
}

func (x *chatTalkServer) Recv() (*MessageRequest, error) {
	m := new(MessageRequest)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

var _Chat_serviceDesc = grpc.ServiceDesc{
	ServiceName: "chat.Chat",
	HandlerType: (*ChatServer)(nil),
	Methods:     []grpc.MethodDesc{},
	Streams: []grpc.StreamDesc{
		{
			StreamName:    "Talk",
			Handler:       _Chat_Talk_Handler,
			ServerStreams: true,
			ClientStreams: true,
		},
	},
	Metadata: "chat.proto",
}

双方向ということですが、私の想像力が乏しいせいで例題がチャットくらいしか思い浮かばなかったのでかなり不完全ではありますがとりあえず作ってみました。

今回作成するプログラムの概要は以下のようになっています。

  • クライアントは 標準入力から受け取ったメッセージ自分の名前 をサーバに送信する

    • 自分の名前はクライアントプログラム開始時にオプションで指定する

  • サーバはクライアントから受信した 名前メッセージ受信時刻 とともに保持する

    • デバッグ用に標準出力にも表示する

  • サーバはクライアントからメッセージを受信するたびに、クライアントごとに保持している最終受信時刻より後に受け取ったメッセージをすべてクライアントに送信する

  • クライアントはサーバから受け取ったメッセージを標準出力に表示する

続いてプログラム。これまでの中では一番複雑ですが、せいぜい60行くらいです。

grpc-go-example/chat/chat_server/main.go
package main

import (
	"log"
	"net"
	"time"

	pb "github.com/righ/grpc-go-example/chat/chat"
	"google.golang.org/grpc"
)

const (
	port = ":50051"
)

type message struct {
	name string
	text string
	time time.Time
}

var messages = []message{}

type server struct {
	pb.UnimplementedChatServer
}

func (s *server) Talk(srv pb.Chat_TalkServer) error {
	lastRead := time.Now()

	for {
		req, err := srv.Recv()
		if err != nil {
			return err
		}
		name, msg := req.GetName(), req.GetMessage()
		log.Printf("%s>: %s", name, msg)

		for _, m := range messages {
			if m.time.After(lastRead) {
				srv.Send(&pb.MessageReply{Name: m.name, Message: m.text})
			}
		}
		lastRead = time.Now()
		messages = append(messages, message{name, msg, lastRead})
	}
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterChatServer(s, &server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

grpc-go-example/chat/chat_client/main.go
package main

import (
	"bufio"
	"context"
	"flag"
	"io"
	"log"
	"os"

	pb "github.com/righ/grpc-go-example/chat/chat"
	"google.golang.org/grpc"
)

const (
	address = "grpc-server:50051"
)

func main() {
	var (
		name = flag.String("name", "noname", "user name")
	)
	flag.Parse()

	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewChatClient(conn)

	ctx := context.Background()
	cli, err := c.Talk(ctx)
	if err != nil {
		log.Fatalf("could not talk: %v", err)
		return
	}

	go func() {
		scanner := bufio.NewScanner(os.Stdin)
		for scanner.Scan() {
			message := scanner.Text()
			if message == "" {
				break
			}
			if err := cli.Send(&pb.MessageRequest{Name: *name, Message: message}); err != nil {
				log.Fatalf("Send failed: %v", err)
			}
		}
	}()
	for {
		reply, err := cli.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("could not chat: %v", err)
		}
		log.Printf("%s> %s", reply.Name, reply.Message)
	}
}

Bidirectional streaming RPC のサーバ側のRPCメソッドは Server構造体(以下 srv という)を受け取るように実装します。

リクエストを複数受け取るためループ内で srv.Recv メソッドを使い req, err := srv.Recv() のようにリクエストを抽出します。

レスポンスは srv.Send メソッドを使って複数返却できます。1個でもいいし、返却しなくても良いでしょう。

Bidirectional streaming PRC の クライアント側RPCメソッドはRPCのクライアントを返却します。(以下 cli という)

リクエストは複数送信できるためループ内で cli.Send メソッドを使い送信し(パラメータはここで指定する)、 レスポンスも複数受信するためループ内で cli.Recv メソッドを使って res, err := cli.Recv() のように抽出します。

今回は 「送信(入力)」と「受信」の両方を待ち受ける必要があるため、「送信(入力)」の方をゴルーチンでバックグラウンド実行させています。

多少チャットらしくするため、今回はクライアントを2つ起動して残念な会話を繰り広げてみます

~/chat/chat_server# go run .

~/chat/chat_client# go run . -name Karen

~/chat/chat_client# go run . -name Yoko

2020/02/01 07:36:46 Karen>: はじめまして
2020/02/01 07:36:49 Karen>: かれんです
2020/02/01 07:36:54 Yoko>: こんにちは
2020/02/01 07:36:59 Yoko>: 用高です
2020/02/01 07:37:41 Karen>: なんて呼んだらいいですか?
2020/02/01 07:37:52 Yoko>: かれんさん、いい名前ですね!
はじめまして
かれんです
なんて呼んだらいいですか?
2020/02/01 07:37:41 Yoko> こんにちは
2020/02/01 07:37:41 Yoko> 用高です
こんにちは
2020/02/01 07:36:54 Karen> はじめまして
2020/02/01 07:36:54 Karen> かれんです
用高です
かれんさん、いい名前ですね!
2020/02/01 07:37:52 Karen> なんて呼んだらいいですか?

(会話はここで途切れている

blockdiag client1serverclient2はじめましてかれんですこんにちわ用高ですなんて呼んだらいいですか?こんにちわ用高ですかれんさん、いい名前ですね!なんて呼んだらいいですか?

お気付きの通り、双方向通信ではあるもののリアルタイム通信ではないので使い勝手はお世辞にも良いとは言えません。

本来はメッセージが届いた時点でつながっている全クライアントにブロードキャストできたらよかったんですが、 gRPCは現状で 任意 のクライアントに対してデータを送信する機能を提供していません。

サーバはクライアントからRPC接続を受けるたびにゴルーチンを作り、該当するRPCメソッドはバックグラウンド実行されます。 ここで作られたゴルーチンは接続してきたクライアントしか知らないので、現実装のRPCでは「受信(srv.Recv)」が唯一のメッセージ同期のトリガーとなってしまっているわけです。

🌞 改良する

(このセクションはgRPCとはそこまで関係ないので読み飛ばしてOKです)

じゃあどうすればよいのか。 一言でいうと「送信と受信のRPCを分ける」が答えです。

今回は先ほどとは趣向を変えて、チャネルをメッセージキューのように使ってみようと思います。

というわけで「メッセージ送信RPC」「メッセージ受信RPC」「チャネル作成RPC」を用意します。

grpc-go-example/chat2/chat2/chat2.proto
syntax = "proto3";

package chat2;

service Chat {
  rpc SendMessages (stream MessageRequest) returns (Null) {}
  rpc GetMessages (User) returns (stream MessageReply) {}
  rpc CreateChannel (Null) returns (User) {}
}

message Null {};

message User {
  uint64 id = 1;
}

message MessageRequest {
  uint64 id = 1;
  string name = 2;
  string message = 3;
}

message MessageReply {
  string name = 1;
  string message = 2;
}

grpc-go-example/chat2/chat2/chat2.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: chat2.proto

package chat2

import (
	context "context"
	fmt "fmt"
	proto "github.com/golang/protobuf/proto"
	grpc "google.golang.org/grpc"
	codes "google.golang.org/grpc/codes"
	status "google.golang.org/grpc/status"
	math "math"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

type Null struct {
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *Null) Reset()         { *m = Null{} }
func (m *Null) String() string { return proto.CompactTextString(m) }
func (*Null) ProtoMessage()    {}
func (*Null) Descriptor() ([]byte, []int) {
	return fileDescriptor_acdb35e63791909c, []int{0}
}

func (m *Null) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_Null.Unmarshal(m, b)
}
func (m *Null) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_Null.Marshal(b, m, deterministic)
}
func (m *Null) XXX_Merge(src proto.Message) {
	xxx_messageInfo_Null.Merge(m, src)
}
func (m *Null) XXX_Size() int {
	return xxx_messageInfo_Null.Size(m)
}
func (m *Null) XXX_DiscardUnknown() {
	xxx_messageInfo_Null.DiscardUnknown(m)
}

var xxx_messageInfo_Null proto.InternalMessageInfo

type User struct {
	Id                   uint64   `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *User) Reset()         { *m = User{} }
func (m *User) String() string { return proto.CompactTextString(m) }
func (*User) ProtoMessage()    {}
func (*User) Descriptor() ([]byte, []int) {
	return fileDescriptor_acdb35e63791909c, []int{1}
}

func (m *User) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_User.Unmarshal(m, b)
}
func (m *User) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_User.Marshal(b, m, deterministic)
}
func (m *User) XXX_Merge(src proto.Message) {
	xxx_messageInfo_User.Merge(m, src)
}
func (m *User) XXX_Size() int {
	return xxx_messageInfo_User.Size(m)
}
func (m *User) XXX_DiscardUnknown() {
	xxx_messageInfo_User.DiscardUnknown(m)
}

var xxx_messageInfo_User proto.InternalMessageInfo

func (m *User) GetId() uint64 {
	if m != nil {
		return m.Id
	}
	return 0
}

type MessageRequest struct {
	Id                   uint64   `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
	Name                 string   `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
	Message              string   `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *MessageRequest) Reset()         { *m = MessageRequest{} }
func (m *MessageRequest) String() string { return proto.CompactTextString(m) }
func (*MessageRequest) ProtoMessage()    {}
func (*MessageRequest) Descriptor() ([]byte, []int) {
	return fileDescriptor_acdb35e63791909c, []int{2}
}

func (m *MessageRequest) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_MessageRequest.Unmarshal(m, b)
}
func (m *MessageRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_MessageRequest.Marshal(b, m, deterministic)
}
func (m *MessageRequest) XXX_Merge(src proto.Message) {
	xxx_messageInfo_MessageRequest.Merge(m, src)
}
func (m *MessageRequest) XXX_Size() int {
	return xxx_messageInfo_MessageRequest.Size(m)
}
func (m *MessageRequest) XXX_DiscardUnknown() {
	xxx_messageInfo_MessageRequest.DiscardUnknown(m)
}

var xxx_messageInfo_MessageRequest proto.InternalMessageInfo

func (m *MessageRequest) GetId() uint64 {
	if m != nil {
		return m.Id
	}
	return 0
}

func (m *MessageRequest) GetName() string {
	if m != nil {
		return m.Name
	}
	return ""
}

func (m *MessageRequest) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

type MessageReply struct {
	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Message              string   `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *MessageReply) Reset()         { *m = MessageReply{} }
func (m *MessageReply) String() string { return proto.CompactTextString(m) }
func (*MessageReply) ProtoMessage()    {}
func (*MessageReply) Descriptor() ([]byte, []int) {
	return fileDescriptor_acdb35e63791909c, []int{3}
}

func (m *MessageReply) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_MessageReply.Unmarshal(m, b)
}
func (m *MessageReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_MessageReply.Marshal(b, m, deterministic)
}
func (m *MessageReply) XXX_Merge(src proto.Message) {
	xxx_messageInfo_MessageReply.Merge(m, src)
}
func (m *MessageReply) XXX_Size() int {
	return xxx_messageInfo_MessageReply.Size(m)
}
func (m *MessageReply) XXX_DiscardUnknown() {
	xxx_messageInfo_MessageReply.DiscardUnknown(m)
}

var xxx_messageInfo_MessageReply proto.InternalMessageInfo

func (m *MessageReply) GetName() string {
	if m != nil {
		return m.Name
	}
	return ""
}

func (m *MessageReply) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

func init() {
	proto.RegisterType((*Null)(nil), "chat2.Null")
	proto.RegisterType((*User)(nil), "chat2.User")
	proto.RegisterType((*MessageRequest)(nil), "chat2.MessageRequest")
	proto.RegisterType((*MessageReply)(nil), "chat2.MessageReply")
}

func init() { proto.RegisterFile("chat2.proto", fileDescriptor_acdb35e63791909c) }

var fileDescriptor_acdb35e63791909c = []byte{
	// 232 bytes of a gzipped FileDescriptorProto
	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xce, 0x48, 0x2c,
	0x31, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x73, 0x94, 0xd8, 0xb8, 0x58, 0xfc,
	0x4a, 0x73, 0x72, 0x94, 0xc4, 0xb8, 0x58, 0x42, 0x8b, 0x53, 0x8b, 0x84, 0xf8, 0xb8, 0x98, 0x32,
	0x53, 0x24, 0x18, 0x15, 0x18, 0x35, 0x58, 0x82, 0x98, 0x32, 0x53, 0x94, 0xfc, 0xb8, 0xf8, 0x7c,
	0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x53, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0xd0, 0x55, 0x08,
	0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x29, 0x30, 0x6a, 0x70, 0x06, 0x81, 0xd9,
	0x42, 0x12, 0x5c, 0xec, 0xb9, 0x10, 0x5d, 0x12, 0xcc, 0x60, 0x61, 0x18, 0x57, 0xc9, 0x86, 0x8b,
	0x07, 0x6e, 0x5e, 0x41, 0x4e, 0x25, 0x5c, 0x37, 0x23, 0x76, 0xdd, 0x4c, 0x28, 0xba, 0x8d, 0x16,
	0x30, 0x72, 0xb1, 0x38, 0x67, 0x24, 0x96, 0x08, 0x99, 0x71, 0xf1, 0x04, 0xa7, 0xe6, 0xa5, 0x40,
	0x8d, 0x2a, 0x16, 0x12, 0xd5, 0x83, 0xf8, 0x0d, 0xd5, 0xad, 0x52, 0xdc, 0x50, 0x61, 0xb0, 0x17,
	0x19, 0x34, 0x18, 0x85, 0x8c, 0xb9, 0xb8, 0xdd, 0x53, 0x4b, 0xe0, 0xda, 0x60, 0xf2, 0x20, 0xaf,
	0x4b, 0x09, 0xa3, 0x9b, 0x51, 0x90, 0x53, 0xa9, 0xc4, 0x60, 0xc0, 0x28, 0xa4, 0xcd, 0xc5, 0xeb,
	0x5c, 0x94, 0x9a, 0x58, 0x92, 0xea, 0x9c, 0x91, 0x98, 0x97, 0x97, 0x9a, 0x23, 0x84, 0x6c, 0xac,
	0x14, 0xb2, 0x19, 0x4a, 0x0c, 0x49, 0x6c, 0xe0, 0xe0, 0x35, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff,
	0xfa, 0x38, 0x52, 0xd5, 0x6d, 0x01, 0x00, 0x00,
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6

// ChatClient is the client API for Chat service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type ChatClient interface {
	SendMessages(ctx context.Context, opts ...grpc.CallOption) (Chat_SendMessagesClient, error)
	GetMessages(ctx context.Context, in *User, opts ...grpc.CallOption) (Chat_GetMessagesClient, error)
	CreateChannel(ctx context.Context, in *Null, opts ...grpc.CallOption) (*User, error)
}

type chatClient struct {
	cc grpc.ClientConnInterface
}

func NewChatClient(cc grpc.ClientConnInterface) ChatClient {
	return &chatClient{cc}
}

func (c *chatClient) SendMessages(ctx context.Context, opts ...grpc.CallOption) (Chat_SendMessagesClient, error) {
	stream, err := c.cc.NewStream(ctx, &_Chat_serviceDesc.Streams[0], "/chat2.Chat/SendMessages", opts...)
	if err != nil {
		return nil, err
	}
	x := &chatSendMessagesClient{stream}
	return x, nil
}

type Chat_SendMessagesClient interface {
	Send(*MessageRequest) error
	CloseAndRecv() (*Null, error)
	grpc.ClientStream
}

type chatSendMessagesClient struct {
	grpc.ClientStream
}

func (x *chatSendMessagesClient) Send(m *MessageRequest) error {
	return x.ClientStream.SendMsg(m)
}

func (x *chatSendMessagesClient) CloseAndRecv() (*Null, error) {
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	m := new(Null)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

func (c *chatClient) GetMessages(ctx context.Context, in *User, opts ...grpc.CallOption) (Chat_GetMessagesClient, error) {
	stream, err := c.cc.NewStream(ctx, &_Chat_serviceDesc.Streams[1], "/chat2.Chat/GetMessages", opts...)
	if err != nil {
		return nil, err
	}
	x := &chatGetMessagesClient{stream}
	if err := x.ClientStream.SendMsg(in); err != nil {
		return nil, err
	}
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	return x, nil
}

type Chat_GetMessagesClient interface {
	Recv() (*MessageReply, error)
	grpc.ClientStream
}

type chatGetMessagesClient struct {
	grpc.ClientStream
}

func (x *chatGetMessagesClient) Recv() (*MessageReply, error) {
	m := new(MessageReply)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

func (c *chatClient) CreateChannel(ctx context.Context, in *Null, opts ...grpc.CallOption) (*User, error) {
	out := new(User)
	err := c.cc.Invoke(ctx, "/chat2.Chat/CreateChannel", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

// ChatServer is the server API for Chat service.
type ChatServer interface {
	SendMessages(Chat_SendMessagesServer) error
	GetMessages(*User, Chat_GetMessagesServer) error
	CreateChannel(context.Context, *Null) (*User, error)
}

// UnimplementedChatServer can be embedded to have forward compatible implementations.
type UnimplementedChatServer struct {
}

func (*UnimplementedChatServer) SendMessages(srv Chat_SendMessagesServer) error {
	return status.Errorf(codes.Unimplemented, "method SendMessages not implemented")
}
func (*UnimplementedChatServer) GetMessages(req *User, srv Chat_GetMessagesServer) error {
	return status.Errorf(codes.Unimplemented, "method GetMessages not implemented")
}
func (*UnimplementedChatServer) CreateChannel(ctx context.Context, req *Null) (*User, error) {
	return nil, status.Errorf(codes.Unimplemented, "method CreateChannel not implemented")
}

func RegisterChatServer(s *grpc.Server, srv ChatServer) {
	s.RegisterService(&_Chat_serviceDesc, srv)
}

func _Chat_SendMessages_Handler(srv interface{}, stream grpc.ServerStream) error {
	return srv.(ChatServer).SendMessages(&chatSendMessagesServer{stream})
}

type Chat_SendMessagesServer interface {
	SendAndClose(*Null) error
	Recv() (*MessageRequest, error)
	grpc.ServerStream
}

type chatSendMessagesServer struct {
	grpc.ServerStream
}

func (x *chatSendMessagesServer) SendAndClose(m *Null) error {
	return x.ServerStream.SendMsg(m)
}

func (x *chatSendMessagesServer) Recv() (*MessageRequest, error) {
	m := new(MessageRequest)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

func _Chat_GetMessages_Handler(srv interface{}, stream grpc.ServerStream) error {
	m := new(User)
	if err := stream.RecvMsg(m); err != nil {
		return err
	}
	return srv.(ChatServer).GetMessages(m, &chatGetMessagesServer{stream})
}

type Chat_GetMessagesServer interface {
	Send(*MessageReply) error
	grpc.ServerStream
}

type chatGetMessagesServer struct {
	grpc.ServerStream
}

func (x *chatGetMessagesServer) Send(m *MessageReply) error {
	return x.ServerStream.SendMsg(m)
}

func _Chat_CreateChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(Null)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(ChatServer).CreateChannel(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/chat2.Chat/CreateChannel",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(ChatServer).CreateChannel(ctx, req.(*Null))
	}
	return interceptor(ctx, in, info, handler)
}

var _Chat_serviceDesc = grpc.ServiceDesc{
	ServiceName: "chat2.Chat",
	HandlerType: (*ChatServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "CreateChannel",
			Handler:    _Chat_CreateChannel_Handler,
		},
	},
	Streams: []grpc.StreamDesc{
		{
			StreamName:    "SendMessages",
			Handler:       _Chat_SendMessages_Handler,
			ClientStreams: true,
		},
		{
			StreamName:    "GetMessages",
			Handler:       _Chat_GetMessages_Handler,
			ServerStreams: true,
		},
	},
	Metadata: "chat2.proto",
}

今回のプログラムの概要は以下のようになっています。

  • クライアントは「チャネル作成RPC(Unary RPC)」経由でサーバにアクセスする

    • サーバ側はIDとチャネルの組を作成し、クライアントにIDを返却する

  • クライアントは標準入力から入力されたメッセージと名前を ID とともに「メッセージ送信RPC(Client streaming RPC)」に送信する

    • サーバは受け取った ID紐付かない チャネル全てに対し、受け取った「名前とメッセージ」を送信する

      • このときクライアントには何も返却しない

  • クライアントは「メッセージ受信RPC(Server streaming RPC)」に ID を送信し、サーバからメッセージを受信するたびに標準出力に表示する

    • この処理はクライアント側でもバックグラウンド実行 され続ける

    • サーバは受け取った ID紐づく チャネルからメッセージを取り出すたびにクライアントに送信する

grpc-go-example/chat2/chat_server/main.go
package main

import (
	"context"
	"io"
	"log"
	"net"
	"time"

	pb "github.com/righ/grpc-go-example/chat2/chat2"
	"google.golang.org/grpc"
)

const (
	port = ":50051"
)

type message struct {
	name string
	text string
	time time.Time
}

var serial = 1
var channels = make(map[uint64]chan message)

type server struct {
	pb.UnimplementedChatServer
}

func (s *server) CreateChannel(_ context.Context, u *pb.Null) (*pb.User, error) {
	serial++
	channels[uint64(serial)] = make(chan message)
	return &pb.User{Id: uint64(serial)}, nil
}

func (s *server) SendMessages(srv pb.Chat_SendMessagesServer) error {
	for {
		req, err := srv.Recv()
		if err != nil {
			srv.SendAndClose(&pb.Null{})
			if err == io.EOF {
				break
			}
			return err
		}
		name, msg := req.GetName(), req.GetMessage()
		log.Printf("%s>: %s", name, msg)
		now := time.Now()

		for id, c := range channels {
			if id == req.Id {
				continue
			}
			go func(c chan message) { c <- message{name, msg, now} }(c)
		}
	}
	return nil
}

func (s *server) GetMessages(user *pb.User, srv pb.Chat_GetMessagesServer) error {
	for {
		m := <-channels[user.Id]
		if err := srv.Send(&pb.MessageReply{Name: m.name, Message: m.text}); err != nil {
			return err
		}
	}
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterChatServer(s, &server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

grpc-go-example/chat2/chat_client/main.go
package main

import (
	"bufio"
	"context"
	"flag"
	"io"
	"log"
	"os"

	pb "github.com/righ/grpc-go-example/chat2/chat2"
	"google.golang.org/grpc"
)

const (
	address = "grpc-server:50051"
)

func main() {
	var (
		name = flag.String("name", "noname", "user name")
	)
	flag.Parse()

	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewChatClient(conn)
	ctx := context.Background()

	user, err := c.CreateChannel(ctx, &pb.Null{})
	if err != nil {
		log.Fatalf("could not send: %v", err)
		return
	}

	go func() {
		cli, err := c.GetMessages(context.Background(), &pb.User{Id: user.Id})
		if err != nil {
			log.Fatalf("could not get: %v", err)
			return
		}
		for {
			reply, err := cli.Recv()
			if err == io.EOF {
				break
			}
			if err != nil {
				log.Fatalf("Recv failed: %v", err)
				return
			}
			log.Printf("%s> %s", reply.Name, reply.Message)
		}
	}()

	cli, err := c.SendMessages(ctx)
	if err != nil {
		log.Fatalf("could not send: %v", err)
		return
	}
	scanner := bufio.NewScanner(os.Stdin)
	for scanner.Scan() {
		message := scanner.Text()
		if message == "" {
			continue
		}
		if err := cli.Send(&pb.MessageRequest{Id: user.Id, Name: *name, Message: message}); err != nil {
			log.Fatalf("Send failed: %v", err)
		}
	}
	cli.CloseAndRecv()
}

今回のプログラムでは CreateChannel RPCにアクセスするたびにチャネルを作成し、それに対応する ID をクライアントに伝えます。いわばセッションIDのようなものです(セキュリティは度外視)。

クライアントはこのIDとメッセージを一緒にメッセージ送信RPCに送信し、受け取ったサーバはIDに紐付かない(つまり自分以外の)チャネルにメッセージを送信します。

クライアントはこれに並行してメッセージ受信RPCにアクセスしておきます。サーバ側のメッセージ受信RPCはチャネルにメッセージが送られてきたタイミングでクライアントにレスポンスを返却するため、リアルタイムでメッセージが送受信されているように見えるというわけですね。

ただし、このプログラムはリソース競合やメモリリークを考慮していないのでそのまま使うのはおすすめできません。

~/chat2/chat_server# go run .

~/chat2/chat_client# go run . -name Karen

~/chat2/chat_client# go run . -name Yoko

2020/02/01 16:58:21 Oji> かれんちゃん、オッハー😃♥ 😃✋😍ちょっと電話できるカナ😜⁉️✋❓❗❓水曜日、会社がお休みになった、よ(^з<)😆かれんちゃんは都合どうかな( ̄ー ̄?) ドライブ🚗どウ(^з<)😃♥ ナンチャッテ(^_^)(^o^)❗(笑)
2020/02/01 16:58:47 Karen> 水曜日は風邪引くから無理かも!
2020/02/01 16:59:16 Oji> くれぐれも体調に気をつけて( ̄▽ ̄)(^^;;🤑😤ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂
2020/02/01 16:58:21 Oji> かれんちゃん、オッハー😃♥ 😃✋😍ちょっと電話できるカナ😜⁉️✋❓❗❓水曜日、会社がお休みになった、よ(^з<)😆かれんちゃんは都合どうかな( ̄ー ̄?)ドライブ🚗どウ(^з<)😃♥ ナンチャッテ(^_^)(^o^)❗(笑)
水曜日は風邪引くから無理かも!
2020/02/01 16:59:16 Oji> くれぐれも体調に気をつけて( ̄▽ ̄)(^^;;🤑😤ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂
かれんちゃん、オッハー😃♥ 😃✋😍ちょっと電話できるカナ😜⁉️✋❓❗❓水曜日、会社がお休みになった、よ(^з<)😆かれんちゃんは都合どうかな( ̄ー ̄?)ドライブ🚗どウ(^з<)😃♥ ナンチャッテ(^_^)(^o^)❗(笑)
2020/02/01 16:58:47 Karen> 水曜日は風邪引くから無理かも!
くれぐれも体調に気をつけて( ̄▽ ̄)(^^;;🤑😤ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂
blockdiag client2serverclient1かれんちゃん、オッハー😃♥ 😃✋😍ちょっと電話できるカナ😜⁉️✋❓❗❓水曜日、会社がお休みになった、よ(^з<)😆かれんちゃんは都合どうかな( ̄ー ̄?)ドライブ🚗どウ(^з<)😃♥ ナンチャッテ(^_^)(^o^)❗(笑)かれんちゃん、オッハー😃♥ 😃✋😍ちょっと電話できるカナ😜⁉️✋❓❗❓水曜日、会社がお休みになった、よ(^з<)😆かれんちゃんは都合どうかな( ̄ー ̄?)ドライブ🚗どウ(^з<)😃♥ ナンチャッテ(^_^)(^o^)❗(笑)水曜日は風邪引くから無理かも!水曜日は風邪引くから無理かも!くれぐれも体調に気をつけて( ̄▽ ̄)(^^;;🤑😤ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂くれぐれも体調に気をつけて( ̄▽ ̄)(^^;;🤑😤ゆっくり、身体休めテネ(笑)オヤスミナサイ🙂

今回はちゃんと会話が噛み合っていますね!

👽 Meta data

メタデータは構造化されていない通信データです。

メインではない何らかの付加情報をやり取りするときに使います。 といってもあまり良い例が思い浮かびませんが、ログなどに記録されるタイムスタンプなどでしょうか。

既存のにメタデータを使ったexampleがあったのでそれを使います。(対象ホストだけ変更)

まずは定義から。この時点で変わったところはありません。

grpc-go-example/features/proto/echo/echo.proto
syntax = "proto3";

package grpc.examples.echo;

option go_package = "google.golang.org/grpc/examples/features/proto/echo";

// EchoRequest is the request for echo.
message EchoRequest {
  string message = 1;
}

// EchoResponse is the response for echo.
message EchoResponse {
  string message = 1;
}

// Echo is the echo service.
service Echo {
  // UnaryEcho is unary echo.
  rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
  // ServerStreamingEcho is server side streaming.
  rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
  // ClientStreamingEcho is client side streaming.
  rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
  // BidirectionalStreamingEcho is bidi streaming.
  rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}

grpc-go-example/features/proto/echo/echo.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: echo.proto

package echo

import (
	context "context"
	fmt "fmt"
	proto "github.com/golang/protobuf/proto"
	grpc "google.golang.org/grpc"
	codes "google.golang.org/grpc/codes"
	status "google.golang.org/grpc/status"
	math "math"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

// EchoRequest is the request for echo.
type EchoRequest struct {
	Message              string   `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *EchoRequest) Reset()         { *m = EchoRequest{} }
func (m *EchoRequest) String() string { return proto.CompactTextString(m) }
func (*EchoRequest) ProtoMessage()    {}
func (*EchoRequest) Descriptor() ([]byte, []int) {
	return fileDescriptor_08134aea513e0001, []int{0}
}

func (m *EchoRequest) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_EchoRequest.Unmarshal(m, b)
}
func (m *EchoRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_EchoRequest.Marshal(b, m, deterministic)
}
func (m *EchoRequest) XXX_Merge(src proto.Message) {
	xxx_messageInfo_EchoRequest.Merge(m, src)
}
func (m *EchoRequest) XXX_Size() int {
	return xxx_messageInfo_EchoRequest.Size(m)
}
func (m *EchoRequest) XXX_DiscardUnknown() {
	xxx_messageInfo_EchoRequest.DiscardUnknown(m)
}

var xxx_messageInfo_EchoRequest proto.InternalMessageInfo

func (m *EchoRequest) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

// EchoResponse is the response for echo.
type EchoResponse struct {
	Message              string   `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *EchoResponse) Reset()         { *m = EchoResponse{} }
func (m *EchoResponse) String() string { return proto.CompactTextString(m) }
func (*EchoResponse) ProtoMessage()    {}
func (*EchoResponse) Descriptor() ([]byte, []int) {
	return fileDescriptor_08134aea513e0001, []int{1}
}

func (m *EchoResponse) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_EchoResponse.Unmarshal(m, b)
}
func (m *EchoResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_EchoResponse.Marshal(b, m, deterministic)
}
func (m *EchoResponse) XXX_Merge(src proto.Message) {
	xxx_messageInfo_EchoResponse.Merge(m, src)
}
func (m *EchoResponse) XXX_Size() int {
	return xxx_messageInfo_EchoResponse.Size(m)
}
func (m *EchoResponse) XXX_DiscardUnknown() {
	xxx_messageInfo_EchoResponse.DiscardUnknown(m)
}

var xxx_messageInfo_EchoResponse proto.InternalMessageInfo

func (m *EchoResponse) GetMessage() string {
	if m != nil {
		return m.Message
	}
	return ""
}

func init() {
	proto.RegisterType((*EchoRequest)(nil), "grpc.examples.echo.EchoRequest")
	proto.RegisterType((*EchoResponse)(nil), "grpc.examples.echo.EchoResponse")
}

func init() { proto.RegisterFile("echo.proto", fileDescriptor_08134aea513e0001) }

var fileDescriptor_08134aea513e0001 = []byte{
	// 234 bytes of a gzipped FileDescriptorProto
	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x92, 0xb1, 0x4b, 0x03, 0x31,
	0x14, 0x87, 0x3d, 0x11, 0xa5, 0x4f, 0xa7, 0xb8, 0x94, 0x2e, 0x96, 0x5b, 0xbc, 0x29, 0x29, 0x16,
	0xff, 0x81, 0x8a, 0xbb, 0xb4, 0xb8, 0x88, 0x4b, 0x3c, 0x7f, 0xa6, 0x81, 0x5c, 0xde, 0xf9, 0x92,
	0x8a, 0xfe, 0xed, 0x2e, 0x92, 0x2b, 0x05, 0x41, 0xba, 0xd5, 0x2d, 0x8f, 0x7c, 0xef, 0xfb, 0x96,
	0x47, 0x84, 0x76, 0xcd, 0xba, 0x17, 0xce, 0xac, 0x94, 0x93, 0xbe, 0xd5, 0xf8, 0xb4, 0x5d, 0x1f,
	0x90, 0x74, 0xf9, 0xa9, 0xaf, 0xe9, 0xfc, 0xbe, 0x5d, 0xf3, 0x12, 0xef, 0x1b, 0xa4, 0xac, 0xc6,
	0x74, 0xd6, 0x21, 0x25, 0xeb, 0x30, 0xae, 0xa6, 0x55, 0x33, 0x5a, 0xee, 0xc6, 0xba, 0xa1, 0x8b,
	0x2d, 0x98, 0x7a, 0x8e, 0x09, 0xfb, 0xc9, 0x9b, 0xef, 0x63, 0x3a, 0x29, 0xa8, 0x7a, 0xa0, 0xd1,
	0x63, 0xb4, 0xf2, 0x35, 0x0c, 0x57, 0xfa, 0x6f, 0x5d, 0xff, 0x4a, 0x4f, 0xa6, 0xfb, 0x81, 0x6d,
	0xb2, 0x3e, 0x52, 0xcf, 0x74, 0xb9, 0x82, 0x7c, 0x40, 0x56, 0x59, 0x60, 0x3b, 0x1f, 0xdd, 0xc1,
	0xdc, 0xb3, 0xaa, 0xd8, 0xef, 0x82, 0x47, 0xcc, 0x87, 0xb7, 0x37, 0x95, 0x02, 0x4d, 0x16, 0xfe,
	0xd5, 0x0b, 0xda, 0xec, 0x39, 0xda, 0xf0, 0x1f, 0x91, 0x59, 0xb5, 0xb8, 0x7d, 0x9a, 0x3b, 0x66,
	0x17, 0xa0, 0x1d, 0x07, 0x1b, 0x9d, 0x66, 0x71, 0xa6, 0xac, 0x9a, 0xdd, 0xaa, 0x79, 0x83, 0xcd,
	0x1b, 0x41, 0x32, 0xc3, 0x59, 0x98, 0x62, 0x7a, 0x39, 0x1d, 0xde, 0xf3, 0x9f, 0x00, 0x00, 0x00,
	0xff, 0xff, 0x23, 0x14, 0x26, 0x96, 0x30, 0x02, 0x00, 0x00,
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4

// EchoClient is the client API for Echo service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type EchoClient interface {
	// UnaryEcho is unary echo.
	UnaryEcho(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (*EchoResponse, error)
	// ServerStreamingEcho is server side streaming.
	ServerStreamingEcho(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (Echo_ServerStreamingEchoClient, error)
	// ClientStreamingEcho is client side streaming.
	ClientStreamingEcho(ctx context.Context, opts ...grpc.CallOption) (Echo_ClientStreamingEchoClient, error)
	// BidirectionalStreamingEcho is bidi streaming.
	BidirectionalStreamingEcho(ctx context.Context, opts ...grpc.CallOption) (Echo_BidirectionalStreamingEchoClient, error)
}

type echoClient struct {
	cc *grpc.ClientConn
}

func NewEchoClient(cc *grpc.ClientConn) EchoClient {
	return &echoClient{cc}
}

func (c *echoClient) UnaryEcho(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (*EchoResponse, error) {
	out := new(EchoResponse)
	err := c.cc.Invoke(ctx, "/grpc.examples.echo.Echo/UnaryEcho", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

func (c *echoClient) ServerStreamingEcho(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption) (Echo_ServerStreamingEchoClient, error) {
	stream, err := c.cc.NewStream(ctx, &_Echo_serviceDesc.Streams[0], "/grpc.examples.echo.Echo/ServerStreamingEcho", opts...)
	if err != nil {
		return nil, err
	}
	x := &echoServerStreamingEchoClient{stream}
	if err := x.ClientStream.SendMsg(in); err != nil {
		return nil, err
	}
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	return x, nil
}

type Echo_ServerStreamingEchoClient interface {
	Recv() (*EchoResponse, error)
	grpc.ClientStream
}

type echoServerStreamingEchoClient struct {
	grpc.ClientStream
}

func (x *echoServerStreamingEchoClient) Recv() (*EchoResponse, error) {
	m := new(EchoResponse)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

func (c *echoClient) ClientStreamingEcho(ctx context.Context, opts ...grpc.CallOption) (Echo_ClientStreamingEchoClient, error) {
	stream, err := c.cc.NewStream(ctx, &_Echo_serviceDesc.Streams[1], "/grpc.examples.echo.Echo/ClientStreamingEcho", opts...)
	if err != nil {
		return nil, err
	}
	x := &echoClientStreamingEchoClient{stream}
	return x, nil
}

type Echo_ClientStreamingEchoClient interface {
	Send(*EchoRequest) error
	CloseAndRecv() (*EchoResponse, error)
	grpc.ClientStream
}

type echoClientStreamingEchoClient struct {
	grpc.ClientStream
}

func (x *echoClientStreamingEchoClient) Send(m *EchoRequest) error {
	return x.ClientStream.SendMsg(m)
}

func (x *echoClientStreamingEchoClient) CloseAndRecv() (*EchoResponse, error) {
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	m := new(EchoResponse)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

func (c *echoClient) BidirectionalStreamingEcho(ctx context.Context, opts ...grpc.CallOption) (Echo_BidirectionalStreamingEchoClient, error) {
	stream, err := c.cc.NewStream(ctx, &_Echo_serviceDesc.Streams[2], "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", opts...)
	if err != nil {
		return nil, err
	}
	x := &echoBidirectionalStreamingEchoClient{stream}
	return x, nil
}

type Echo_BidirectionalStreamingEchoClient interface {
	Send(*EchoRequest) error
	Recv() (*EchoResponse, error)
	grpc.ClientStream
}

type echoBidirectionalStreamingEchoClient struct {
	grpc.ClientStream
}

func (x *echoBidirectionalStreamingEchoClient) Send(m *EchoRequest) error {
	return x.ClientStream.SendMsg(m)
}

func (x *echoBidirectionalStreamingEchoClient) Recv() (*EchoResponse, error) {
	m := new(EchoResponse)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

// EchoServer is the server API for Echo service.
type EchoServer interface {
	// UnaryEcho is unary echo.
	UnaryEcho(context.Context, *EchoRequest) (*EchoResponse, error)
	// ServerStreamingEcho is server side streaming.
	ServerStreamingEcho(*EchoRequest, Echo_ServerStreamingEchoServer) error
	// ClientStreamingEcho is client side streaming.
	ClientStreamingEcho(Echo_ClientStreamingEchoServer) error
	// BidirectionalStreamingEcho is bidi streaming.
	BidirectionalStreamingEcho(Echo_BidirectionalStreamingEchoServer) error
}

// UnimplementedEchoServer can be embedded to have forward compatible implementations.
type UnimplementedEchoServer struct {
}

func (*UnimplementedEchoServer) UnaryEcho(ctx context.Context, req *EchoRequest) (*EchoResponse, error) {
	return nil, status.Errorf(codes.Unimplemented, "method UnaryEcho not implemented")
}
func (*UnimplementedEchoServer) ServerStreamingEcho(req *EchoRequest, srv Echo_ServerStreamingEchoServer) error {
	return status.Errorf(codes.Unimplemented, "method ServerStreamingEcho not implemented")
}
func (*UnimplementedEchoServer) ClientStreamingEcho(srv Echo_ClientStreamingEchoServer) error {
	return status.Errorf(codes.Unimplemented, "method ClientStreamingEcho not implemented")
}
func (*UnimplementedEchoServer) BidirectionalStreamingEcho(srv Echo_BidirectionalStreamingEchoServer) error {
	return status.Errorf(codes.Unimplemented, "method BidirectionalStreamingEcho not implemented")
}

func RegisterEchoServer(s *grpc.Server, srv EchoServer) {
	s.RegisterService(&_Echo_serviceDesc, srv)
}

func _Echo_UnaryEcho_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(EchoRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(EchoServer).UnaryEcho(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/grpc.examples.echo.Echo/UnaryEcho",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(EchoServer).UnaryEcho(ctx, req.(*EchoRequest))
	}
	return interceptor(ctx, in, info, handler)
}

func _Echo_ServerStreamingEcho_Handler(srv interface{}, stream grpc.ServerStream) error {
	m := new(EchoRequest)
	if err := stream.RecvMsg(m); err != nil {
		return err
	}
	return srv.(EchoServer).ServerStreamingEcho(m, &echoServerStreamingEchoServer{stream})
}

type Echo_ServerStreamingEchoServer interface {
	Send(*EchoResponse) error
	grpc.ServerStream
}

type echoServerStreamingEchoServer struct {
	grpc.ServerStream
}

func (x *echoServerStreamingEchoServer) Send(m *EchoResponse) error {
	return x.ServerStream.SendMsg(m)
}

func _Echo_ClientStreamingEcho_Handler(srv interface{}, stream grpc.ServerStream) error {
	return srv.(EchoServer).ClientStreamingEcho(&echoClientStreamingEchoServer{stream})
}

type Echo_ClientStreamingEchoServer interface {
	SendAndClose(*EchoResponse) error
	Recv() (*EchoRequest, error)
	grpc.ServerStream
}

type echoClientStreamingEchoServer struct {
	grpc.ServerStream
}

func (x *echoClientStreamingEchoServer) SendAndClose(m *EchoResponse) error {
	return x.ServerStream.SendMsg(m)
}

func (x *echoClientStreamingEchoServer) Recv() (*EchoRequest, error) {
	m := new(EchoRequest)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

func _Echo_BidirectionalStreamingEcho_Handler(srv interface{}, stream grpc.ServerStream) error {
	return srv.(EchoServer).BidirectionalStreamingEcho(&echoBidirectionalStreamingEchoServer{stream})
}

type Echo_BidirectionalStreamingEchoServer interface {
	Send(*EchoResponse) error
	Recv() (*EchoRequest, error)
	grpc.ServerStream
}

type echoBidirectionalStreamingEchoServer struct {
	grpc.ServerStream
}

func (x *echoBidirectionalStreamingEchoServer) Send(m *EchoResponse) error {
	return x.ServerStream.SendMsg(m)
}

func (x *echoBidirectionalStreamingEchoServer) Recv() (*EchoRequest, error) {
	m := new(EchoRequest)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

var _Echo_serviceDesc = grpc.ServiceDesc{
	ServiceName: "grpc.examples.echo.Echo",
	HandlerType: (*EchoServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "UnaryEcho",
			Handler:    _Echo_UnaryEcho_Handler,
		},
	},
	Streams: []grpc.StreamDesc{
		{
			StreamName:    "ServerStreamingEcho",
			Handler:       _Echo_ServerStreamingEcho_Handler,
			ServerStreams: true,
		},
		{
			StreamName:    "ClientStreamingEcho",
			Handler:       _Echo_ClientStreamingEcho_Handler,
			ClientStreams: true,
		},
		{
			StreamName:    "BidirectionalStreamingEcho",
			Handler:       _Echo_BidirectionalStreamingEcho_Handler,
			ServerStreams: true,
			ClientStreams: true,
		},
	},
	Metadata: "echo.proto",
}

以下、プログラムです。 "google.golang.org/grpc/metadata" を使うのがミソですね。

grpc-go-example/features/metadata/server/main.go
package main

import (
	"context"
	"flag"
	"fmt"
	"io"
	"log"
	"math/rand"
	"net"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"

	pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50051, "the port to serve on")

const (
	timestampFormat = time.StampNano
	streamingCount  = 10
)

type server struct {
	pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
	fmt.Printf("--- UnaryEcho ---\n")
	// Create trailer in defer to record function return time.
	defer func() {
		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
		grpc.SetTrailer(ctx, trailer)
	}()

	// Read metadata from client.
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata")
	}
	if t, ok := md["timestamp"]; ok {
		fmt.Printf("timestamp from metadata:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	}

	// Create and send header.
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
	grpc.SendHeader(ctx, header)

	fmt.Printf("request received: %v, sending echo\n", in)

	return &pb.EchoResponse{Message: in.Message}, nil
}

func (s *server) ServerStreamingEcho(in *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
	fmt.Printf("--- ServerStreamingEcho ---\n")
	// Create trailer in defer to record function return time.
	defer func() {
		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
		stream.SetTrailer(trailer)
	}()

	// Read metadata from client.
	md, ok := metadata.FromIncomingContext(stream.Context())
	if !ok {
		return status.Errorf(codes.DataLoss, "ServerStreamingEcho: failed to get metadata")
	}
	if t, ok := md["timestamp"]; ok {
		fmt.Printf("timestamp from metadata:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	}

	// Create and send header.
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
	stream.SendHeader(header)

	fmt.Printf("request received: %v\n", in)

	// Read requests and send responses.
	for i := 0; i < streamingCount; i++ {
		fmt.Printf("echo message %v\n", in.Message)
		err := stream.Send(&pb.EchoResponse{Message: in.Message})
		if err != nil {
			return err
		}
	}
	return nil
}

func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {
	fmt.Printf("--- ClientStreamingEcho ---\n")
	// Create trailer in defer to record function return time.
	defer func() {
		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
		stream.SetTrailer(trailer)
	}()

	// Read metadata from client.
	md, ok := metadata.FromIncomingContext(stream.Context())
	if !ok {
		return status.Errorf(codes.DataLoss, "ClientStreamingEcho: failed to get metadata")
	}
	if t, ok := md["timestamp"]; ok {
		fmt.Printf("timestamp from metadata:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	}

	// Create and send header.
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
	stream.SendHeader(header)

	// Read requests and send responses.
	var message string
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			fmt.Printf("echo last received message\n")
			return stream.SendAndClose(&pb.EchoResponse{Message: message})
		}
		message = in.Message
		fmt.Printf("request received: %v, building echo\n", in)
		if err != nil {
			return err
		}
	}
}

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
	fmt.Printf("--- BidirectionalStreamingEcho ---\n")
	// Create trailer in defer to record function return time.
	defer func() {
		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
		stream.SetTrailer(trailer)
	}()

	// Read metadata from client.
	md, ok := metadata.FromIncomingContext(stream.Context())
	if !ok {
		return status.Errorf(codes.DataLoss, "BidirectionalStreamingEcho: failed to get metadata")
	}

	if t, ok := md["timestamp"]; ok {
		fmt.Printf("timestamp from metadata:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	}

	// Create and send header.
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
	stream.SendHeader(header)

	// Read requests and send responses.
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		fmt.Printf("request received %v, sending echo\n", in)
		if err := stream.Send(&pb.EchoResponse{Message: in.Message}); err != nil {
			return err
		}
	}
}

func main() {
	flag.Parse()
	rand.Seed(time.Now().UnixNano())
	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	fmt.Printf("server listening at %v\n", lis.Addr())

	s := grpc.NewServer()
	pb.RegisterEchoServer(s, &server{})
	s.Serve(lis)
}

grpc-go-example/features/metadata/client/main.go
package main

import (
	"context"
	"flag"
	"fmt"
	"io"
	"log"
	"time"

	"google.golang.org/grpc"
	pb "google.golang.org/grpc/examples/features/proto/echo"
	"google.golang.org/grpc/metadata"
)

var addr = flag.String("addr", "grpc-server:50051", "the address to connect to")

const (
	timestampFormat = time.StampNano // "Jan _2 15:04:05.000"
	streamingCount  = 10
)

func unaryCallWithMetadata(c pb.EchoClient, message string) {
	fmt.Printf("--- unary ---\n")
	// Create metadata and context.
	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
	ctx := metadata.NewOutgoingContext(context.Background(), md)

	// Make RPC using the context with the metadata.
	var header, trailer metadata.MD
	r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}, grpc.Header(&header), grpc.Trailer(&trailer))
	if err != nil {
		log.Fatalf("failed to call UnaryEcho: %v", err)
	}

	if t, ok := header["timestamp"]; ok {
		fmt.Printf("timestamp from header:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("timestamp expected but doesn't exist in header")
	}
	if l, ok := header["location"]; ok {
		fmt.Printf("location from header:\n")
		for i, e := range l {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("location expected but doesn't exist in header")
	}
	fmt.Printf("response:\n")
	fmt.Printf(" - %s\n", r.Message)

	if t, ok := trailer["timestamp"]; ok {
		fmt.Printf("timestamp from trailer:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("timestamp expected but doesn't exist in trailer")
	}
}

func serverStreamingWithMetadata(c pb.EchoClient, message string) {
	fmt.Printf("--- server streaming ---\n")
	// Create metadata and context.
	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
	ctx := metadata.NewOutgoingContext(context.Background(), md)

	// Make RPC using the context with the metadata.
	stream, err := c.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: message})
	if err != nil {
		log.Fatalf("failed to call ServerStreamingEcho: %v", err)
	}

	// Read the header when the header arrives.
	header, err := stream.Header()
	if err != nil {
		log.Fatalf("failed to get header from stream: %v", err)
	}
	// Read metadata from server's header.
	if t, ok := header["timestamp"]; ok {
		fmt.Printf("timestamp from header:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("timestamp expected but doesn't exist in header")
	}
	if l, ok := header["location"]; ok {
		fmt.Printf("location from header:\n")
		for i, e := range l {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("location expected but doesn't exist in header")
	}

	// Read all the responses.
	var rpcStatus error
	fmt.Printf("response:\n")
	for {
		r, err := stream.Recv()
		if err != nil {
			rpcStatus = err
			break
		}
		fmt.Printf(" - %s\n", r.Message)
	}
	if rpcStatus != io.EOF {
		log.Fatalf("failed to finish server streaming: %v", rpcStatus)
	}

	// Read the trailer after the RPC is finished.
	trailer := stream.Trailer()
	// Read metadata from server's trailer.
	if t, ok := trailer["timestamp"]; ok {
		fmt.Printf("timestamp from trailer:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("timestamp expected but doesn't exist in trailer")
	}
}

func clientStreamWithMetadata(c pb.EchoClient, message string) {
	fmt.Printf("--- client streaming ---\n")
	// Create metadata and context.
	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
	ctx := metadata.NewOutgoingContext(context.Background(), md)

	// Make RPC using the context with the metadata.
	stream, err := c.ClientStreamingEcho(ctx)
	if err != nil {
		log.Fatalf("failed to call ClientStreamingEcho: %v\n", err)
	}

	// Read the header when the header arrives.
	header, err := stream.Header()
	if err != nil {
		log.Fatalf("failed to get header from stream: %v", err)
	}
	// Read metadata from server's header.
	if t, ok := header["timestamp"]; ok {
		fmt.Printf("timestamp from header:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("timestamp expected but doesn't exist in header")
	}
	if l, ok := header["location"]; ok {
		fmt.Printf("location from header:\n")
		for i, e := range l {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("location expected but doesn't exist in header")
	}

	// Send all requests to the server.
	for i := 0; i < streamingCount; i++ {
		if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil {
			log.Fatalf("failed to send streaming: %v\n", err)
		}
	}

	// Read the response.
	r, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("failed to CloseAndRecv: %v\n", err)
	}
	fmt.Printf("response:\n")
	fmt.Printf(" - %s\n\n", r.Message)

	// Read the trailer after the RPC is finished.
	trailer := stream.Trailer()
	// Read metadata from server's trailer.
	if t, ok := trailer["timestamp"]; ok {
		fmt.Printf("timestamp from trailer:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("timestamp expected but doesn't exist in trailer")
	}
}

func bidirectionalWithMetadata(c pb.EchoClient, message string) {
	fmt.Printf("--- bidirectional ---\n")
	// Create metadata and context.
	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
	ctx := metadata.NewOutgoingContext(context.Background(), md)

	// Make RPC using the context with the metadata.
	stream, err := c.BidirectionalStreamingEcho(ctx)
	if err != nil {
		log.Fatalf("failed to call BidirectionalStreamingEcho: %v\n", err)
	}

	go func() {
		// Read the header when the header arrives.
		header, err := stream.Header()
		if err != nil {
			log.Fatalf("failed to get header from stream: %v", err)
		}
		// Read metadata from server's header.
		if t, ok := header["timestamp"]; ok {
			fmt.Printf("timestamp from header:\n")
			for i, e := range t {
				fmt.Printf(" %d. %s\n", i, e)
			}
		} else {
			log.Fatal("timestamp expected but doesn't exist in header")
		}
		if l, ok := header["location"]; ok {
			fmt.Printf("location from header:\n")
			for i, e := range l {
				fmt.Printf(" %d. %s\n", i, e)
			}
		} else {
			log.Fatal("location expected but doesn't exist in header")
		}

		// Send all requests to the server.
		for i := 0; i < streamingCount; i++ {
			if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil {
				log.Fatalf("failed to send streaming: %v\n", err)
			}
		}
		stream.CloseSend()
	}()

	// Read all the responses.
	var rpcStatus error
	fmt.Printf("response:\n")
	for {
		r, err := stream.Recv()
		if err != nil {
			rpcStatus = err
			break
		}
		fmt.Printf(" - %s\n", r.Message)
	}
	if rpcStatus != io.EOF {
		log.Fatalf("failed to finish server streaming: %v", rpcStatus)
	}

	// Read the trailer after the RPC is finished.
	trailer := stream.Trailer()
	// Read metadata from server's trailer.
	if t, ok := trailer["timestamp"]; ok {
		fmt.Printf("timestamp from trailer:\n")
		for i, e := range t {
			fmt.Printf(" %d. %s\n", i, e)
		}
	} else {
		log.Fatal("timestamp expected but doesn't exist in trailer")
	}

}

const message = "this is examples/metadata"

func main() {
	flag.Parse()
	// Set up a connection to the server.
	conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	c := pb.NewEchoClient(conn)

	unaryCallWithMetadata(c, message)
	time.Sleep(1 * time.Second)

	serverStreamingWithMetadata(c, message)
	time.Sleep(1 * time.Second)

	clientStreamWithMetadata(c, message)
	time.Sleep(1 * time.Second)

	bidirectionalWithMetadata(c, message)
}

サーバはコンテキスト経由でメタデータを受け取ります。 md, ok := metadata.FromIncomingContext(ctx) のようにコンテキストからメタデータを取得します。メタデータはマップ型なので md["timestamp"] のように値を抽出します

クライアントに送信する場合は、 SendHeaderSetTrailer メソッドを使います。

レスポンスの前に送るメタデータが Header, あとに送るメタデータが Trailer ということらしいです。これは gRPC というより HTTP2 の用語です。 (gRPCにおけるmetadata、そしてそれを node.js client から取得する - Qiita)

クライアントからメタデータを送信するにはコンテキストを使います。 メタデータ自体は metadata.Pairs で作り、 metadata.NewOutgoingContext(context.Background(), md) でメタデータを紐付けたコンテキストを作成します。

受信する方法ですが、Unary RPC の場合は ...grpc.CallOption に複数指定します。 Headerを受け取るときは grpc.Header(&header), Trailer を受け取るときは grpc.Trailer(&trailer) のように metadata.MD 変数のアドレスを渡すことでサーバからのメタデータを受け取ります。

Streaming RPC の場合は stream.Header()stream.Trailer() のように得られます。

とりあえず使ってみます。

~/features/metadata/server# go run .

~/features/metadata/client# go run .

server listening at [::]:50051
--- UnaryEcho ---
timestamp from metadata:
 0. Feb  1 17:27:41.704179000
request received: message:"this is examples/metadata" , sending echo
--- ServerStreamingEcho ---
timestamp from metadata:
 0. Feb  1 17:27:42.761855500
request received: message:"this is examples/metadata"
echo message this is examples/metadata
echo message this is examples/metadata
echo message this is examples/metadata
echo message this is examples/metadata
echo message this is examples/metadata
echo message this is examples/metadata
echo message this is examples/metadata
echo message this is examples/metadata
echo message this is examples/metadata
echo message this is examples/metadata
--- ClientStreamingEcho ---
timestamp from metadata:
 0. Feb  1 17:27:43.764238600
request received: message:"this is examples/metadata" , building echo
request received: message:"this is examples/metadata" , building echo
request received: message:"this is examples/metadata" , building echo
request received: message:"this is examples/metadata" , building echo
request received: message:"this is examples/metadata" , building echo
request received: message:"this is examples/metadata" , building echo
request received: message:"this is examples/metadata" , building echo
request received: message:"this is examples/metadata" , building echo
request received: message:"this is examples/metadata" , building echo
request received: message:"this is examples/metadata" , building echo
echo last received message
--- BidirectionalStreamingEcho ---
timestamp from metadata:
 0. Feb  1 17:27:44.769518100
request received message:"this is examples/metadata" , sending echo
request received message:"this is examples/metadata" , sending echo
request received message:"this is examples/metadata" , sending echo
request received message:"this is examples/metadata" , sending echo
request received message:"this is examples/metadata" , sending echo
request received message:"this is examples/metadata" , sending echo
request received message:"this is examples/metadata" , sending echo
request received message:"this is examples/metadata" , sending echo
request received message:"this is examples/metadata" , sending echo
request received message:"this is examples/metadata" , sending echo
--- unary ---
timestamp from header:
 0. Feb  1 17:27:41.717621700
location from header:
 0. MTV
response:
 - this is examples/metadata
timestamp from trailer:
 0. Feb  1 17:27:41.717779600
--- server streaming ---
timestamp from header:
 0. Feb  1 17:27:42.763338200
location from header:
 0. MTV
response:
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
timestamp from trailer:
 0. Feb  1 17:27:42.763500200
--- client streaming ---
timestamp from header:
 0. Feb  1 17:27:43.764817300
location from header:
 0. MTV
response:
 - this is examples/metadata

timestamp from trailer:
 0. Feb  1 17:27:43.767922700
--- bidirectional ---
response:
timestamp from header:
 0. Feb  1 17:27:44.771782200
location from header:
 0. MTV
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
 - this is examples/metadata
timestamp from trailer:
 0. Feb  1 17:27:44.772841200

出力が多くてちょっと分かりづらいですが、 メタデータがやり取りできてるっぽいです。

使い分けを簡単にまとめ。ストリーミングは全部同じですが..

種別

サーバ

クライアント

Unary RPC

header

grpc.SetTrailer(ctx, trailer)

trailer

grpc.SendHeader(ctx, header)

c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}, grpc.Header(&header), grpc.Trailer(&trailer))

Server streaming RPC

header

stream.SetTrailer(trailer)

trailer

stream.SendHeader(header)

header

header := stream.Header()

trailer

trailer := stream.Trailer()

Client streaming RPC

header

stream.SetTrailer(trailer)

trailer

stream.SendHeader(header)

header

header := stream.Header()

trailer

trailer := stream.Trailer()

Bidiretional streaming RPC

header

stream.SetTrailer(trailer)

trailer

stream.SendHeader(header)

header

header := stream.Header()

trailer

trailer := stream.Trailer()

流石に疲れたのでここまで

テストは別の記事にするかもしれません。(しれないかもしれません)

間違いがあったら優しく教えて下さい。

参考