gRPC のサービスとメソッドを作成する

作成者: James Newton-King

このドキュメントでは、C# で gRPC のサービスとメソッドを作成する方法について説明します。 ここでは、次の内容について説明します。

  • .proto ファイルでサービスとメソッドを定義する方法。
  • gRPC の C# ツールを使用して生成されるコード。
  • gRPC のサービスとメソッドの実装。

新しい gRPC サービスを作成する

C# での gRPC サービスでは、API 開発に対する gRPC のコントラクト優先アプローチが導入されました。 サービスとメッセージは、.proto ファイルで定義されます。 これで、C# ツールによって .proto ファイルからコードが生成されます。 サーバー側アセットでは、サービスごとの抽象基本データ型と、すべてのメッセージのクラスが生成されます。

次の .proto ファイル:

  • Greeter サービスを定義します。
  • Greeter サービスで SayHello 呼び出しを定義します。
  • SayHello では、HelloRequest メッセージを送信し、HelloReply メッセージを受信します
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

C# ツールによって C# の GreeterBase 基本データ型が生成されます。

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

既定では、生成された GreeterBase は何も行いません。 その仮想メソッド SayHello は、それを呼び出すすべてのクライアントに UNIMPLEMENTED エラーを返します。 サービスが役に立つようにするには、アプリで GreeterBase の具象実装を作成する必要があります。

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

ServerCallContext は、サーバー側呼び出しのコンテキストを提供します。

サービスの実装は、アプリに登録されます。 ASP.NET Core の gRPC によってホストされているサービスは、MapGrpcService メソッドを使用してルーティングパイプラインに追加される必要があります。

app.MapGrpcService<GreeterService>();

詳細については、「ASP.NET Core を使用した gRPC サービス」を参照してください。

gRPC のメソッドを実装する

gRPC サービスには、さまざまな種類のメソッドを含めることができます。 サービスによってメッセージがどのように送受信されるかは、定義されているメソッドの種類によって異なります。 gRPC のメソッドの種類は次のとおりです。

  • 単項
  • サーバー ストリーミング。
  • クライアント ストリーミング
  • 双方向ストリーミング

ストリーミング呼び出しは、.proto ファイルで stream キーワードを使って指定します。 stream は、呼び出しの要求メッセージ、応答メッセージ、またはその両方に配置できます。

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

各呼び出し型には、異なるメソッド シグネチャがあります。 具象実装で抽象基本サービス型から生成されたメソッドをオーバーライドすると、正しい引数と戻り値の型が使用されるようになります。

単項メソッド

単項メソッドは、要求メッセージをパラメーターとして取得し、応答を返します。 単項呼び出しは、応答が返されたときに完了します。

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

単項呼び出しは、Web API コントローラーでのアクションに最もよく似ています。 gRPC のメソッドとアクションとの重要な相違点の 1 つとして、gRPC のメソッドは要求の一部を別のメソッド引数にバインドできない点があります。 gRPC のメソッドには、受信要求データに対して常に 1 つのメッセージ引数があります。 要求メッセージにフィールドを追加することで、gRPC サービスに複数の値を送信できます。

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

サーバー ストリーミング メソッド

サーバー ストリーミング メソッドは、要求メッセージをパラメーターとして取得します。 複数のメッセージは呼び出し元にストリーミングで戻すことができるため、応答メッセージの送信には responseStream.WriteAsync が使用されます。 メソッドが返されると、サーバー ストリーミングの呼び出しが完了します。

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

クライアントは、サーバー ストリーミング メソッドが開始された後に、追加のメッセージやデータを送信することはできません。 一部のストリーミング メソッドは、永久に実行するように設計されています。 連続ストリーミング メソッドの場合、クライアントは不要になった呼び出しをキャンセルできます。 キャンセルが発生すると、クライアントはサーバーにシグナルを送信し、ServerCallContext.CancellationToken が発生します。 CancellationToken トークンは、次を目的とする場合に非同期メソッドを使用してサーバーで使用する必要があります。

  • すべての非同期処理を、ストリーミング呼び出しと共にキャンセルする。
  • メソッドをすぐに終了する。
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

クライアント ストリーミング メソッド

クライアント ストリーミング メソッドは、メッセージを受信するメソッド "なしで" 開始します。 requestStream パラメーターは、クライアントからメッセージを読み取るために使用されます。 応答メッセージが返されると、クライアント ストリーミングの呼び出しが完了します。

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

双方向ストリーミング メソッド

双方向ストリーミング メソッドは、メッセージを受信するメソッド "なしで" 開始します。 requestStream パラメーターは、クライアントからメッセージを読み取るために使用されます。 このメソッドでは、responseStream.WriteAsync を使用して、メッセージを送信することを選択できます。 双方向ストリーミング呼び出しは、メソッドが返されると完了します。

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

上記のコードでは次の操作が行われます。

  • 要求ごとに応答を送信します。
  • 双方向ストリーミングの基本的な使用方法です。

要求の読み取り、応答の同時送信など、より複雑なシナリオをサポートできます。

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

双方向ストリーミング メソッドでは、クライアントとサービスはいつでも相互にメッセージを送信できます。 双方向メソッドの最適な実装は、要件によって異なります。

gRPC 要求ヘッダーにアクセスする

要求メッセージは、クライアントが gRPC サービスにデータを送信する唯一の方法ではありません。 ヘッダー値は、ServerCallContext.RequestHeaders を使用してサービスで使用できます。

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

gRPC ストリーミング メソッドを使用したマルチスレッド

複数のスレッドを使う gRPC ストリーミング メソッドの実装に関して、重要な考慮事項があります。

リーダーとライターのスレッド セーフ

IAsyncStreamReader<TMessage>IServerStreamWriter<TMessage> はそれぞれ、一度に 1 つのスレッドでのみ使用できます。 ストリーミング gRPC メソッドの場合、複数のスレッドが同時に requestStream.MoveNext() を使って新しいメッセージを読み取ることはできません。 また、複数のスレッドが同時に responseStream.WriteAsync(message) を使って新しいメッセージを書き込むことはできません。

複数のスレッドが gRPC メソッドと安全に対話できるようにする方法は、System.Threading.Channels によるプロデューサー/コンシューマー パターンを使用することです。

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

上記の gRPC サーバー ストリーミング メソッドでは:

  • DataResult メッセージを生成および使用するための制限付きチャネルを作成します。
  • チャネルからメッセージを読み取り、それらを応答ストリームに書き込むタスクを開始します。
  • 複数のスレッドからチャネルにメッセージを書き込みます。

メモ

双方向ストリーミング メソッドは、引数として IAsyncStreamReader<TMessage>IServerStreamWriter<TMessage> を受け取ります。 これらの型を互いに個別のスレッドで使用しても、安全です。

呼び出し終了後の gRPC メソッドとの対話

gRPC メソッドが終了すると、サーバーでの gRPC 呼び出しが終了します。 gRPC メソッドに渡される次の引数を、呼び出しが終了した後に使用するのは、安全ではありません。

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

gRPC メソッドがこれらの型を使用するバックグラウンド タスクを開始する場合は、gRPC メソッドが終了する前にそのタスクを完了する必要があります。 gRPC メソッドが終了した後にコンテキスト、ストリーム リーダー、またはストリーム ライターを引き続き使用すると、エラーと予期しない動作が発生します。

次の例では、サーバー ストリーミング メソッドは呼び出しが完了した後に応答ストリームに書き込むことができます。

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

前の例では、ソリューションはメソッドを終了する前に書き込みタスクを待機します。

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

その他の技術情報