こんにちは!エンジニアの大槻です。
Baseconnect ではたくさんのデータを保持していますが、単一のデータベースにそれらのデータを保存し、連携する各サービスからそのデータベースにアクセスする構成には、パフォーマンスその他多くの観点から問題があります。
それらの問題を解決するため、Baseconnect では複数のサービス間でデータを同期する方針をとっています。
データの同期には Amazon SQS や Amazon Kinesis、Apache Kafka などのメッセージングシステムを用いますが、ここで問題になるのが、メッセージングシステムを介して送受信されるデータ(メッセージ)の形式をどう管理するかという点です。
候補としては Apache Avro や JSON Schema を挙げていましたが、エルデンリングのロード中にふと「Protocol Buffers が適しているのでは」と思いついたので、息抜きに試してみました。
実装の流れ
1. スキーマを定義する
まずは Protocol Buffers を用いてメッセージスキーマを定義します。
events/example.proto
syntax = "proto3"; package events; option go_package = "github.com/ORGANIZATION/REPOSITORY/events"; message ExampleMessage { string id = 1; int64 timestamp = 2; string sentence = 3; }
2. コードを生成する
先ほど定義したメッセージスキーマを Golang その他のコードから扱うため、メッセージスキーマからコードを生成します。
$ protoc \ -I ./events \ --go_out ./events \ --go_opt module=github.com/ORGANIZATION/REPOSITORY/events \ ./events/*.proto
3. Kinesis Data Streams プロデューサーを作成する
先ほど Protocol Buffers で定義したスキーマからメッセージを作成し、メッセージングシステムにメッセージを送信します。
今回はメッセージングシステムに Kinesis Data Streams を用いますが、Amazon SQS など他のメッセージングシステムでも基本的な流れは変わりません。
cmd/producer/main.go
var client *kinesis.client // ... 省略 ... m := &events.ExampleMesssage{ Id: uuid.NewString(), Timestamp: time.Now().Unix(), Sentence: "The quick brown fox jumps over the lazy dog.", } d, _ := proto.Marshal(m) r := &kinesis.PutRecordInput{ Data: d, PartitionKey: aws.String(m.Id), StreamName: aws.String("STREAM_NAME"), } res, _ := client.PutRecord(context.TODO(), r) // ... 省略 ...
4. Kinesis Data Streams コンシューマーを作成する
最後にメッセージングシステムからメッセージを受信したら一連の流れは完了です。
cmd/consumer/main.go
var client *kinesis.client // ... 省略 ... res, _ := client.GetRecords(ctx, &kinesis.GetRecordsInput{ ShardIterator: i, }) for _, r := range res.Records { m := &events.ExampleMessage{} proto.Unmarshal(r.Data, m) fmt.Printf("%+v", m) }
まとめ
Protocol Buffers でスキーマを定義し、Kinesis Data Streams 経由でメッセージを送受信してみました。
今回はメッセージングシステムに Kinesis Data Streams、プログラミング言語に Golang を選択しましたが、他のメッセージングシステムやプログラミング言語でもほぼ同じ流れで実装できるかと思います。
Protocol Buffers は比較的馴染みがある、複数のプログラミング言語をサポートしている、後方互換性の担保をサポートしてくれるなど、メッセージングシステムと併用するうえで非常に便利な仕組みだと感じました。
今後はもっと実践的な内容で検証していきたいと思います。
Baseconnectではエンジニアメンバーを絶賛募集中です。 もしご興味をお持ちいただけた方がいらっしゃいましたら、下記のリンクよりぜひご覧いただけますと嬉しく思います!