当前位置: 首页 > 测试知识 > Gatling数据库性能关联测试JDBC连接和SQL执行时间监控
Gatling数据库性能关联测试JDBC连接和SQL执行时间监控
2025-12-18 作者cwb 浏览次数17

Gatling中进行数据库性能关联测试将JDBC协议和HTTP协议集成在同一场景中,建立前端用户压力和后端数据库负载之间的因果关系,精准定位从应用层到数据层的性能瓶颈。


Gatling JDBC协议集成

Gatling JDBC模块允许您直接对数据库进行负载测试,和HTTP请求混合编排,模拟真实业务中应用服务器和数据库的交互。


1. 基础环境配置

在build.sbt或pom.xml中必须显式引入JDBC依赖:


scala

// build.sbt

libraryDependencies += "io.gatling" % "gatling-jdbc" % "3.9.5"

在Scala测试类中导入:


scala

import io.gatling.jdbc.Predef._

import java.sql.PreparedStatement


2. JDBC连接池配置

连接池配置直接决定测试的真实性和数据库压力模式:


scala

val jdbcConfig = jdbc

  .url("jdbc:mysql://${DB_HOST}:3306/${DB_NAME}")

  .username("${DB_USER}")

  .password("${DB_PASSWORD}")

  .driver("com.mysql.cj.jdbc.Driver")

  // 连接池核心配置(建议和生产环境对齐)

  .maximumPoolSize(50)           // 最大连接数

  .minimumIdle(10)              // 最小空闲连接

  .connectionTimeout(30000)     // 连接超时(ms)

  .idleTimeout(600000)          // 空闲连接超时

  .maxLifetime(1800000)         // 连接最大生命周期

  .validationQuery("SELECT 1")  // 连接健康检查SQL


SQL执行时间监控的实现方式


1. 基础SQL执行和计时

Gatling会自动记录每条SQL的执行时间,但需要写好检查点:


scala

val scn = scenario("数据库性能测试")

  .exec(

    jdbc("查询用户订单")

      .select("SELECT * FROM orders WHERE user_id = ?")

      .params("${userId}")  // 参数化查询

      .check(

        // 检查执行结果

        jdbcResponseTime.mean.lt(50),     // 平均响应时间<50ms

        jdbcResponseTime.max.lt(200),     // 最大响应时间<200ms

        jdbcResponseTime.percentile4.lt(100), // P95<100ms

        // 验证数据正确性

        jdbcColumn("order_id").count.gt(0)

      )

  )


2. 监控SQL执行

要实现更详细的监控,需要捕获执行计划:


scala

.exec(

  jdbc("复杂查询剖析")

    .select("""

      EXPLAIN ANALYZE 

      SELECT o.*, u.username 

      FROM orders o

      JOIN users u ON o.user_id = u.id

      WHERE o.created_at > ?

      ORDER BY o.total_amount DESC

      LIMIT 100

    """)

    .params("2024-01-01")

    .check(

      jdbcColumn("Execution Time").findAll.saveAs("execution_plan")

    )

)

.exec { session =>

  // 解析执行计划中的主要指标

  val plan = session("execution_plan").as[Seq[String]]

  val planningTime = extractTime(plan, "Planning Time")

  val executionTime = extractTime(plan, "Execution Time")

  println(s"SQL剖析结果 - 规划时间: ${planningTime}ms, 执行时间: ${executionTime}ms")

  session

}


数据库和HTTP请求的关联测试

1. 混合场景设计模拟完整事务


scala

val mixedScenario = scenario("完整事务流程")

  .feed(userIds.feeder)

  .exec(

    http("用户登录API")

      .post("/api/login")

      .body(StringBody("""{"username":"${username}"}"""))

      .check(jsonPath("$.userId").saveAs("loggedInUserId"))

  )

  .pause(1.second)

  // 关键:验证API调用是否触发预期数据库操作

  .exec(

    jdbc("验证登录记录")

      .select("SELECT COUNT(*) as cnt FROM login_log WHERE user_id = ?")

      .params("${loggedInUserId}")

      .check(jdbcColumn("cnt").is(1))  // 断言恰好一条记录

  )

  .exec(

    http("提交订单API")

      .post("/api/order")

      .body(ElFileBody("templates/order.json"))

      .check(jsonPath("$.orderId").saveAs("apiOrderId"))

  )

  // 验证订单是否持久化且数据一致

  .exec(

    jdbc("验证订单数据一致性")

      .select("""

        SELECT o.status, o.total_amount, oi.item_count 

        FROM orders o 

        JOIN order_items oi ON o.id = oi.order_id

        WHERE o.id = ?

      """)

      .params("${apiOrderId}")

      .check(

        jdbcColumn("status").is("PENDING"),

        jdbcColumn("total_amount").notNull

      )

  )


2. 竞态条件和并发问题测试


scala

val concurrencyScenario = scenario("库存扣减并发测试")

  .feed(productIds.feeder)

  .exec(

    jdbc("读取初始库存")

      .select("SELECT stock FROM products WHERE id = ?")

      .params("${productId}")

      .check(jdbcColumn("stock").saveAs("initialStock"))

  )

  // 模拟100个并发用户同时扣减库存

  .exec(

    jdbc("并发扣减库存")

      .update("""

        UPDATE products 

        SET stock = stock - 1 

        WHERE id = ? AND stock > 0

      """)

      .params("${productId}")

      .check(jdbcRowsUpdated.is(1)) // 断言只影响一行

  )

  .exec(

    jdbc("验证最终库存")

      .select("SELECT stock FROM products WHERE id = ?")

      .params("${productId}")

      .check(jdbcColumn("stock").is(session => {

        val initial = session("initialStock").as[Int]

        initial - 1  // 应正好减少1

      }))

  )


监控诊断高级配置

1. 慢SQL捕获和告警


scala

val slowSqlThreshold = 100 // 定义慢SQL阈值(ms)


val scn = scenario("慢SQL监控")

  .exec(

    jdbc("潜在慢查询")

      .select("""

        SELECT * FROM large_table 

        WHERE created_at BETWEEN ? AND ?

        ORDER BY complex_calculation(column)

      """)

      .params("${startDate}", "${endDate}")

      .check(

        jdbcResponseTime.max.saveAs("sqlDuration")

      )

  )

  .doIf(session => session("sqlDuration").as[Int] > slowSqlThreshold) {

    exec(session => {

      val duration = session("sqlDuration").as[Int]

      // 触发告警逻辑

      println(s"慢SQL告警: 执行时间 ${duration}ms")

      // 可集成到外部监控系统

      sendToMonitoringSystem(session)

      session

    })

  }


2. 连接池性能监控


scala

.exec(session => {

  // 定期采样连接池状态

  val jdbcStats = jdbcConfig.connectionPool.getStats

  println(s"""

    |连接池状态报告:

    |活跃连接: ${jdbcStats.getActiveConnections}

    |空闲连接: ${jdbcStats.getIdleConnections}

    |等待线程: ${jdbcStats.getThreadsAwaitingConnection}

    |连接获取平均等待时间: ${jdbcStats.getConnectionTimeout}

  """.stripMargin)

  

  // 连接泄漏检测

  if (jdbcStats.getActiveConnections > jdbcStats.getMaxConnections * 0.8) {

    println("连接池接近饱和,可能存在连接泄漏")

  }

  session

})


3. 和APM工具集成

通过自定义检查点将数据发送到New Relic、Datadog等APM:


scala

.check(

  jdbcResponseTime.max.transform(duration => {

    // 发送自定义指标到APM

    val tags = Map(

      "sql_operation" -> "select_user_orders",

      "test_scenario" -> "checkout_flow"

    )

    apmClient.sendMetric("database.query.duration", duration, tags)

    duration

  })

)


数据库压力测试示例


scala

class EcommerceDbTest extends Simulation {

  

  val jdbcConfig = jdbc

    .url("jdbc:mysql://localhost:3306/ecommerce")

    .username("perf_test")

    .password("test123")

    .maximumPoolSize(30)

  

  val httpProtocol = http.baseUrl("http://localhost:8080")

  

  val dbFeeder = csv("data/product_ids.csv").circular

  

  val orderCheckoutScenario = scenario("数据库密集型下单流程")

    .feed(dbFeeder)

    .exec(

      http("浏览商品")

        .get("/api/products/${productId}")

        .check(jsonPath("$.price").saveAs("productPrice"))

    )

    .exec(

      jdbc("获取实时库存")

        .select("""

          SELECT available_stock, warehouse_id 

          FROM inventory 

          WHERE product_id = ? 

          FOR UPDATE NOWAIT

        """)

        .params("${productId}")

        .check(

          jdbcColumn("available_stock").gt(0),

          jdbcColumn("warehouse_id").saveAs("warehouseId")

        )

    )

    .tryMax(3) { // 库存更新重试机制

      exec(

        jdbc("原子性扣减库存")

          .update("""

            UPDATE inventory 

            SET available_stock = available_stock - 1,

                locked_stock = locked_stock + 1

            WHERE product_id = ? 

              AND available_stock > 0

            RETURNING updated_rows

          """)

          .params("${productId}")

          .check(jdbcRowsUpdated.is(1))

      )

    }

    .exec(

      http("确认下单")

        .post("/api/orders")

        .body(StringBody(

          """{"productId":${productId},"quantity":1}"""

        ))

        .check(status.is(201))

    )

    .exec(

      jdbc("事务完整性验证")

        .select("""

          SELECT 

            (SELECT available_stock FROM inventory WHERE product_id = ?) as final_stock,

            (SELECT COUNT(*) FROM orders WHERE product_id = ?) as order_count

          FROM dual

        """)

        .params("${productId}", "${productId}")

        .check(

          jdbcColumn("final_stock").transform(_ == session("initialStock").as[Int] - 1),

          jdbcColumn("order_count").is(1)

        )

    )

  

  // 注入策略:重点测试数据库并发处理能力

  setUp(

    orderCheckoutScenario.inject(

      rampUsersPerSec(10).to(100).during(5.minutes)

    )

  ).protocols(jdbcConfig, httpProtocol)

    .assertions(

      global.jdbcResponseTime.percentile4.lt(150), // P95<150ms

      jdbcAllRequests.percentile4.lt(200),        // 所有SQL的P95<200ms

      jdbcFailedRequests.percent.is(0)            // 数据库零失败

    )

}


指标诊断

数据库指标:

SQL响应时间分布:P50/P95/P99值,识别长尾效应

连接池使用率:活跃连接数/最大连接数,>80%需告警

事务成功率:提交和回滚比例

锁等待时间:特别是FOR UPDATE查询


关联分析:

比较HTTP P95响应时间和对应SQL的P95执行时间

识别N+1查询问题:单个HTTP请求触发的SQL数量

验证数据库操作是否和业务事务边界一致


这种测试方法能精确暴露连接池配置不当、缺失索引、事务隔离问题、锁竞争等数据库层瓶颈,实现真正的全链路性能可见。

文章标签: 测试工具 软件测试 数据库测试 数据库系统检测
咨询软件测试