当前位置: 首页 > 测试知识 > 使用Gatling进行gRPC服务的高并发性能与流式调用测试
使用Gatling进行gRPC服务的高并发性能与流式调用测试
2025-11-26 作者cwb 浏览次数41

gRPC协议特性和测试挑战

gRPC作为基于HTTP/2的高性能RPC框架,在现代微服务架构中广泛应用。其双向流、多路复用等特性为性能测试带来了独特挑战。Gatling通过专门的gRPC插件提供完整的测试解决方案,能够充分验证gRPC服务在高并发场景下的表现。


测试环境搭建和依赖配置

Gatling gRPC插件集成

首先需要在项目中添加Gatling gRPC依赖,以SBT构建工具为例:


scala

// 在project/plugins.sbt中添加

addSbtPlugin("io.gatling" % "gatling-sbt" % "4.2.9")


// 在build.sbt中配置依赖

libraryDependencies ++= Seq(

  "io.gatling" % "gatling-core" % "3.9.5",

  "io.gatling" % "gatling-grpc" % "3.9.5",

  "io.grpc" % "grpc-netty" % "1.56.1",

  "io.grpc" % "grpc-protobuf" % "1.56.1",

  "io.grpc" % "grpc-stub" % "1.56.1",

  "com.thesamet.scalapb" %% "scalapb-runtime" % "0.11.13",

  "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % "0.11.13"

)


// 配置ScalaPB编译插件

Compile / PB.targets := Seq(

  scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"

)



Protocol Buffers定义和代码生成

定义gRPC服务接口和消息格式:


protobuf

// src/main/protobuf/example_service.proto

syntax = "proto3";


package com.example;


option java_package = "com.example.protos";

option java_outer_classname = "ExampleServiceProto";


service ExampleService {

  // 一元RPC调用

  rpc SimpleRequest (SimpleRequestMessage) returns (SimpleResponseMessage) {}

  

  // 服务端流式RPC

  rpc ServerStreamRequest (StreamRequestMessage) returns (stream StreamResponseMessage) {}

  

  // 客户端流式RPC  

  rpc ClientStreamRequest (stream StreamRequestMessage) returns (StreamResponseMessage) {}

  

  // 双向流式RPC

  rpc BidirectionalStream (stream StreamRequestMessage) returns (stream StreamResponseMessage) {}

}


message SimpleRequestMessage {

  string request_id = 1;

  int32 payload_size = 2;

  map<string, string> metadata = 3;

}


message SimpleResponseMessage {

  string response_id = 1;

  bytes payload = 2;

  Status status = 3;

  int64 processing_time_ms = 4;

}


message StreamRequestMessage {

  string stream_id = 1;

  int32 sequence_number = 2;

  bytes chunk_data = 3;

  bool is_complete = 4;

}


message StreamResponseMessage {

  string stream_id = 1;

  int32 sequence_number = 2;

  bytes chunk_data = 3;

  Status status = 4;

}


message Status {

  int32 code = 1;

  string message = 2;

}


基础一元RPC调用性能测试

简单请求响应测试场景

构建基本的gRPC一元调用性能测试:


scala

import io.gatling.core.Predef._

import io.gatling.grpc.Predef._

import io.grpc._

import com.example.protos.ExampleServiceProto._

import scala.concurrent.duration._


class BasicGrpcSimulation extends Simulation {

  

  // gRPC协议配置

  val grpcProtocol = grpc(ManagedChannelBuilder.forAddress("localhost", 50051))

    .usePlaintext()  // 测试环境使用明文传输

    .maxInboundMessageSize(16 * 1024 * 1024)  // 16MB最大消息大小

    .enableKeepAlive(true)

    .keepAliveTime(30, TimeUnit.SECONDS)

    .keepAliveTimeout(5, TimeUnit.SECONDS)

  

  // 构建请求消息

  def createSimpleRequest(): SimpleRequestMessage = {

    SimpleRequestMessage()

      .withRequestId(java.util.UUID.randomUUID().toString)

      .withPayloadSize(1024)

      .addMetadata("client_timestamp", System.currentTimeMillis().toString)

  }

  

  // 定义检查点验证响应

  val responseCheck = grpc("simple_response_check")

    .check(

      // 验证状态码

      grpcExtract({ r: SimpleResponseMessage => r.status.map(_.code) }).is(0),

      // 验证响应ID存在

      grpcExtract({ r: SimpleResponseMessage => r.responseId.nonEmpty }).is(true),

      // 提取处理时间用于断言

      grpcExtract({ r: SimpleResponseMessage => r.processingTimeMs }).saveAs("proc_time")

    )

  

  // 测试场景定义

  val simpleCallScenario = scenario("gRPC Simple Call Test")

    .exec(

      grpc("simple_unary_call")

        .rpc(ExampleServiceGrpc.METHOD_SIMPLE_REQUEST)

        .payload(createSimpleRequest())

        .extract(responseCheck)

    )

    .pause(100.milliseconds)  // 模拟用户思考时间

  

  // 负载模型配置

  setUp(

    simpleCallScenario.inject(

      // 预热阶段

      rampUsersPerSec(1).to(10).during(1.minute),

      // 稳定负载阶段

      constantUsersPerSec(20).during(5.minutes),

      // 压力测试阶段

      rampUsersPerSec(20).to(100).during(2.minutes),

      // 峰值测试

      constantUsersPerSec(100).during(3.minutes)

    )

  ).protocols(grpcProtocol)

    .assertions(

      global.responseTime.percentile3.lt(500),  // P99响应时间<500ms

      global.failedRequests.percent.lt(1.0),    // 错误率<1%

      forAll.responseTime.percentile4.lt(1000)  // P99.9响应时间<1s

    )

    .maxDuration(15.minutes)

}

高级一元调用测试特性

实现更复杂的一元调用测试场景:


scala

class AdvancedUnaryGrpcSimulation extends Simulation {

  

  val grpcProtocol = grpc(ManagedChannelBuilder.forAddress("loadbalancer.example.com", 50051))

    .usePlaintext()

    .defaultHeaders(

      Map(

        "client-version" -> "gatling/3.9.5",

        "user-agent" -> "gatling-grpc-loadtest"

      )

    )

    .overrideAuthority("api.example.com")  // TLS证书验证

  

  // 参数化请求构建器

  val requestFeeder = csv("test_data/grpc_requests.csv").circular

  val dynamicPayloadFeeder = Iterator.continually(Map(

    "dynamic_payload" -> generateRandomPayload(util.Random.nextInt(2048)),

    "correlation_id" -> java.util.UUID.randomUUID().toString

  ))

  

  def generateRandomPayload(size: Int): String = {

    // 生成指定大小的测试数据

    util.Random.alphanumeric.take(size).mkString

  }

  

  val dynamicRequestScenario = scenario("Dynamic Unary Calls")

    .feed(requestFeeder)

    .feed(dynamicPayloadFeeder)

    .exec(

      grpc("parameterized_unary_call")

        .rpc(ExampleServiceGrpc.METHOD_SIMPLE_REQUEST)

        .payload(SimpleRequestMessage()

          .withRequestId("${correlation_id}")

          .withPayloadSize("${payload_size}".toInt)

          .addMetadata("test_case", "${test_case}")

          .addMetadata("dynamic_data", "${dynamic_payload}")

        )

        .header("x-correlation-id")("${correlation_id}")

        .check(

          grpcExtract({ r: SimpleResponseMessage => r.status.flatMap(_.message) }).saveAs("status_msg"),

          grpcExtract({ r: SimpleResponseMessage => r.processingTimeMs }).lt(1000)

        )

    )

    .exec { session =>

      // 记录自定义指标

      val procTime = session("proc_time").as[Long]

      val statusMsg = session("status_msg").asOption[String]

      

      // 可以在这里添加自定义日志或指标收集

      if (procTime > 500) {

        // 记录慢请求

        println(s"Slow request detected: ${session("correlation_id").as[String]} took ${procTime}ms")

      }

      session

    }

  

  // 混合负载场景

  val mixedLoadScenario = scenario("Mixed gRPC Load")

    .randomSwitch(

      70.0 -> exec("fast_requests", 

        grpc("fast_unary")

          .rpc(ExampleServiceGrpc.METHOD_SIMPLE_REQUEST)

          .payload(createSimpleRequest().withPayloadSize(128))

      ),

      20.0 -> exec("medium_requests",

        grpc("medium_unary")  

          .rpc(ExampleServiceGrpc.METHOD_SIMPLE_REQUEST)

          .payload(createSimpleRequest().withPayloadSize(1024))

      ),

      10.0 -> exec("large_requests",

        grpc("large_unary")

          .rpc(ExampleServiceGrpc.METHOD_SIMPLE_REQUEST) 

          .payload(createSimpleRequest().withPayloadSize(8192))

      )

    )

  

  setUp(

    dynamicRequestScenario.inject(rampUsers(100).during(2.minutes)),

    mixedLoadScenario.inject(constantUsersPerSec(5).during(10.minutes))

  ).protocols(grpcProtocol)

}


流式RPC调用性能测试

服务端流式调用测试

测试服务端推送数据的流式场景:


scala

class ServerStreamingGrpcSimulation extends Simulation {

  

  val grpcProtocol = grpc(ManagedChannelBuilder.forAddress("localhost", 50051))

    .usePlaintext()

    .maxInboundMessageSize(32 * 1024 * 1024)

  

  // 服务端流式调用测试

  val serverStreamScenario = scenario("Server Streaming Test")

    .exec(

      grpc("server_stream_call")

        .rpc(ExampleServiceGrpc.METHOD_SERVER_STREAM_REQUEST)

        .payload(StreamRequestMessage()

          .withStreamId(java.util.UUID.randomUUID().toString)

          .withSequenceNumber(0)

        )

        .stream

        .collect(

          // 收集流式响应并验证

          grpcExtract({ r: StreamResponseMessage => r.sequenceNumber }).saveAs("seq_num"),

          grpcExtract({ r: StreamResponseMessage => r.chunkData.size }).saveAs("chunk_size"),

          grpcExtract({ r: StreamResponseMessage => r.status.flatMap(_.code) }).is(0)

        )

        .endOnStatus(_.exists(_.code != 0))  // 遇到错误状态时结束流

        .endOnCount(100)  // 最多接收100条消息

        .endOnTimeout(30.seconds)  // 30秒超时

        .check(

          // 流级别检查

          grpcStreamTotalCount.gt(10),  // 至少收到10条消息

          grpcStreamCompletionCode.is(0)  // 流正常结束

        )

    )

    .exec { session =>

      // 分析流式调用结果

      val totalMessages = session("grpcStreamTotalCount").asOption[Int].getOrElse(0)

      val avgChunkSize = session("grpcStreamAvgChunkSize").asOption[Int].getOrElse(0)

      

      println(s"Server stream completed: $totalMessages messages, avg chunk: $avgChunkSize bytes")

      session

    }

  

  setUp(

    serverStreamScenario.inject(

      rampUsersPerSec(1).to(5).during(1.minute),

      constantUsersPerSec(5).during(5.minutes)

    )

  ).protocols(grpcProtocol)

}


客户端流式调用测试

测试客户端持续发送数据的流式场景:


scala

class ClientStreamingGrpcSimulation extends Simulation {

  

  val grpcProtocol = grpc(ManagedChannelBuilder.forAddress("localhost", 50051))

    .usePlaintext()

  

  // 生成客户端流数据

  val streamDataFeeder = Iterator.from(0).map { i => 

    Map(

      "chunk_seq" -> i,

      "chunk_data" -> generateChunkData(512 + util.Random.nextInt(1024)),

      "is_last" -> (i >= 49)  // 发送50条后结束

    )

  }

  

  def generateChunkData(size: Int): String = {

    // 生成测试数据块

    util.Random.alphanumeric.take(size).mkString

  }

  

  val clientStreamScenario = scenario("Client Streaming Test")

    .feed(streamDataFeeder)

    .exec(

      grpc("client_stream_call")

        .rpc(ExampleServiceGrpc.METHOD_CLIENT_STREAM_REQUEST)

        .stream

        .send(

          StreamRequestMessage()

            .withStreamId(java.util.UUID.randomUUID().toString)

            .withSequenceNumber("${chunk_seq}")

            .withChunkData(com.google.protobuf.ByteString.copyFromUtf8("${chunk_data}"))

            .withIsComplete("${is_last}")

        )

        .endOnCondition(session => session("is_last").as[Boolean])

        .check(

          grpcExtract({ r: StreamResponseMessage => r.status.flatMap(_.code) }).is(0),

          grpcExtract({ r: StreamResponseMessage => r.streamId }).saveAs("completed_stream_id")

        )

    )

    .exec { session =>

      // 验证客户端流完成

      val streamId = session("completed_stream_id").asOption[String]

      val totalSent = session("chunk_seq").as[Int] + 1

      println(s"Client stream completed: $streamId, sent $totalSent chunks")

      session

    }

  

  setUp(

    clientStreamScenario.inject(

      constantUsersPerSec(2).during(10.minutes)  // 控制并发流数量

    )

  ).protocols(grpcProtocol)

}


双向流式调用测试

测试全双工双向流式通信:


scala

class BidirectionalStreamingGrpcSimulation extends Simulation {

  

  val grpcProtocol = grpc(ManagedChannelBuilder.forAddress("localhost", 50051))

    .usePlaintext()

    .maxInboundMessageSize(16 * 1024 * 1024)

  

  // 双向流测试场景

  val bidirectionalStreamScenario = scenario("Bidirectional Streaming Test")

    .exec(

      grpc("bidirectional_stream")

        .rpc(ExampleServiceGrpc.METHOD_BIDIRECTIONAL_STREAM)

        .stream

        .bidirectional(

          // 发送逻辑

          (session, stream) => {

            val streamId = java.util.UUID.randomUUID().toString

            var sequence = 0

            

            // 启动发送协程

            stream.onSend { () =>

              if (sequence < 20) {

                Some(

                  StreamRequestMessage()

                    .withStreamId(streamId)

                    .withSequenceNumber(sequence)

                    .withChunkData(com.google.protobuf.ByteString.copyFromUtf8(s"Message $sequence"))

                    .withIsComplete(sequence == 19)

                ).tap { _ => sequence += 1 }

              } else {

                None

              }

            }

            

            // 接收处理逻辑

            stream.onReceive { response: StreamResponseMessage =>

              // 验证响应

              require(response.streamId == streamId, "Stream ID mismatch")

              require(response.sequenceNumber >= 0, "Invalid sequence number")

              

              // 可以在这里添加业务逻辑验证

              if (response.status.exists(_.code != 0)) {

                println(s"Stream $streamId received error: ${response.status.flatMap(_.message)}")

              }

            }

            

            // 返回更新后的session

            session.set("current_stream_id", streamId)

          }

        )

        .endOnTimeout(30.seconds)

        .endOnStatus(_.exists(_.code != 0))

        .check(

          grpcStreamCompletionCode.is(0),

          grpcStreamMessageCount.between(10, 25)  // 预期消息数量范围

        )

    )

    .exec { session =>

      // 流结束后处理

      val streamId = session("current_stream_id").asOption[String]

      val messageCount = session("grpcStreamMessageCount").asOption[Int]

      println(s"Bidirectional stream $streamId completed with $messageCount messages")

      session

    }

  

  // 复杂双向流测试 - 模拟聊天场景

  val chatStreamScenario = scenario("Chat-like Bidirectional Stream")

    .exec(

      grpc("chat_stream")

        .rpc(ExampleServiceGrpc.METHOD_BIDIRECTIONAL_STREAM)

        .stream

        .bidirectional(

          (session, stream) => {

            val userId = s"user_${util.Random.nextInt(1000)}"

            var messageCount = 0

            var receivedCount = 0

            

            // 定时发送消息

            stream.onSend { () =>

              if (messageCount < 10 && util.Random.nextDouble() < 0.3) {

                // 30%概率发送消息

                messageCount += 1

                Some(

                  StreamRequestMessage()

                    .withStreamId(userId)

                    .withSequenceNumber(messageCount)

                    .withChunkData(com.google.protobuf.ByteString.copyFromUtf8(

                      s"Message ${messageCount} from $userId at ${System.currentTimeMillis()}"

                    ))

                    .withIsComplete(messageCount == 10)

                )

              } else if (messageCount >= 10) {

                None

              } else {

                // 等待下一次发送机会

                Some(null) // 返回null表示本次不发送,但保持流活跃

              }

            }

            

            // 处理接收到的消息

            stream.onReceive { response: StreamResponseMessage =>

              receivedCount += 1

              // 可以在这里实现消息处理逻辑

              if (receivedCount % 5 == 0) {

                println(s"User $userId received $receivedCount messages")

              }

            }

            

            session

              .set("user_id", userId)

              .set("sent_messages", messageCount)

              .set("received_messages", receivedCount)

          }

        )

        .endOnTimeout(2.minutes)

        .check(

          grpcStreamCompletionCode.is(0)

        )

    )

  

  setUp(

    bidirectionalStreamScenario.inject(rampUsers(50).during(1.minute)),

    chatStreamScenario.inject(constantUsersPerSec(1).during(5.minutes))

  ).protocols(grpcProtocol)

    .assertions(

      global.failedRequests.percent.lt(5.0),  // 双向流允许稍高的错误率

      global.responseTime.percentile3.lt(2000) // P99响应时间<2s

    )

}



高级测试特性和监控

自定义指标收集


scala

class AdvancedMonitoringGrpcSimulation extends Simulation {

  

  // 自定义指标收集器

  val customMetrics = new CustomGrpcMetrics()

  

  val grpcProtocol = grpc(ManagedChannelBuilder.forAddress("localhost", 50051))

    .usePlaintext()

    .disableWarmUp(true)  // 手动控制预热

  

  val monitoredScenario = scenario("Monitored gRPC Test")

    .exec(

      grpc("monitored_call")

        .rpc(ExampleServiceGrpc.METHOD_SIMPLE_REQUEST)

        .payload(createSimpleRequest())

        .check(

          grpcExtract({ r: SimpleResponseMessage => r.processingTimeMs }).saveAs("proc_time"),

          grpcExtract({ r: SimpleResponseMessage => r.payload.size }).saveAs("response_size")

        )

    )

    .exec { session =>

      // 收集自定义指标

      val procTime = session("proc_time").as[Long]

      val respSize = session("response_size").as[Int]

      val status = if (session.isFailed) "failure" else "success"

      

      customMetrics.recordCall(procTime, respSize, status)

      session

    }

  

  // 预热阶段

  val warmUpScenario = scenario("Warm Up")

    .exec(grpc("warmup_call")

      .rpc(ExampleServiceGrpc.METHOD_SIMPLE_REQUEST)

      .payload(createSimpleRequest())

    )

  

  // 主测试前执行预热

  setUp(

    warmUpScenario.inject(rampUsers(10).during(30.seconds))

      .andThen(

        monitoredScenario.inject(

          constantUsersPerSec(20).during(10.minutes)

        )

      )

  ).protocols(grpcProtocol)

}


// 自定义指标收集类

class CustomGrpcMetrics {

  private val callTimings = new java.util.concurrent.ConcurrentLinkedQueue[Long]()

  private val responseSizes = new java.util.concurrent.ConcurrentLinkedQueue[Int]()

  private val callStatuses = new java.util.concurrent.ConcurrentHashMap[String, Int]()

  

  def recordCall(processingTime: Long, responseSize: Int, status: String): Unit = {

    callTimings.offer(processingTime)

    responseSizes.offer(responseSize)

    callStatuses.merge(status, 1, Integer.sum)

    

    // 定期输出统计信息

    if (callTimings.size() % 100 == 0) {

      println(s"Metrics snapshot - Calls: ${callTimings.size()}, " +

        s"Avg time: ${callTimings.stream().mapToLong(_.toLong).average().orElse(0)}ms, " +

        s"Statuses: $callStatuses")

    }

  }

}



错误处理和重试机制


scala

class ResilientGrpcSimulation extends Simulation {

  

  val grpcProtocol = grpc(ManagedChannelBuilder.forAddress("localhost", 50051))

    .usePlaintext()

    .maxRetryAttempts(3)  // 最大重试次数

    .retryBackoff(100.milliseconds, 2.0, 1.second)  // 退避策略

  

  val resilientScenario = scenario("Resilient gRPC Test")

    .tryMax(2) {  // 场景级别重试

      exec(

        grpc("resilient_call")

          .rpc(ExampleServiceGrpc.METHOD_SIMPLE_REQUEST)

          .payload(createSimpleRequest())

          .check(

            grpcExtract({ r: SimpleResponseMessage => r.status.flatMap(_.code) }).is(0)

          )

          .retryOnStatus(Status.UNAVAILABLE.getCode)  // 服务不可用时重试

          .retryOnStatus(Status.DEADLINE_EXCEEDED.getCode)  // 超时时重试

      )

    }

    .exitHereIfFailed  // 如果重试后仍然失败则退出

  

  // 测试不同错误场景的恢复能力

  val errorScenario = scenario("Error Handling Test")

    .exec(

      grpc("error_prone_call")

        .rpc(ExampleServiceGrpc.METHOD_SIMPLE_REQUEST)

        .payload(createSimpleRequest().withPayloadSize(-1))  // 故意制造错误

        .check(

          grpcExtract({ r: SimpleResponseMessage => r.status.flatMap(_.code) })

            .transform {

              case 0 => "success"

              case code => s"error_$code"

            }

            .saveAs("call_result")

        )

    )

    .doIf(session => session("call_result").as[String].startsWith("error")) {

      // 错误处理逻辑

      exec { session =>

        println(s"Call failed with: ${session("call_result").as[String]}")

        session

      }

    }

  

  setUp(

    resilientScenario.inject(constantUsersPerSec(10).during(5.minutes)),

    errorScenario.inject(constantUsersPerSec(1).during(2.minutes))

  ).protocols(grpcProtocol)

}


通过以上全面的测试方案,可以充分验证gRPC服务在各种场景下的性能表现,包括高并发一元调用、各种流式模式以及系统的容错能力和资源使用情况。

文章标签: 并发压力测试 测评网站并发压力 软件应用性能测试 应用性能测试 接口性能测试 软件性能测试 性能测试 压力测试 测试工具
咨询软件测试