当前位置: 首页 > 质量专栏 > 软件压力测试工具Gatling虚拟用户注入策略详解:rampUsers、atOnceUsers、constantUsersPerSec与分层人口模型
软件压力测试工具Gatling虚拟用户注入策略详解:rampUsers、atOnceUsers、constantUsersPerSec与分层人口模型
2025-11-24 作者cwb 浏览次数62

Gatling虚拟用户注入方式详解:rampUsers、atOnceUsers、constantUsersPerSec和分层人口模型


1. Gatling负载注入架构和主要概念

注入策略执行引擎

Gatling的虚拟用户注入建立在响应式流控制架构上,通过用户注入器和场景调度器的协同工作实现精确的负载控制。


Gatling注入策略内部执行流程

Injection Profile → User Stream Generator → Scenario Scheduler → Virtual User Lifecycle


虚拟用户生命周期状态


scala

trait UserState {

  // 用户状态转换

  case object Waiting extends UserState

  case object Starting extends UserState  

  case object Running extends UserState

  case object Finished extends UserState

  case object Failed extends UserState

}


2. 基础注入深度解析

rampUsers-线性递增负载


scala

class RampUsersInjectionSimulation extends Simulation {

  

  val httpProtocol = http.baseUrl("https://api.example.com")

  

  val scn = scenario("Ramp Users Load Test")

    .exec(http("API Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  // 基础rampUsers用法

  setUp(

    scn.inject(

      rampUsers(100).during(60) // 60秒内线性增加到100用户

    )

  ).protocols(httpProtocol)

  

  // 高级rampUsers配置

  val advancedRampSetup = setUp(

    scn.inject(

      nothingFor(10), // 先等待10秒

      rampUsers(50).during(30), // 30秒内增加到50用户

      nothingFor(20), // 保持50用户20秒

      rampUsers(100).during(45) // 45秒内再增加到100用户

    )

  )

  

  // rampUsers的数学原理

  object RampUsersMath {

    /**

     * rampUsers的线性增长函数

     * @param totalUsers 总用户数

     * @param duration 持续时间(秒)

     * @return 每秒应该启动的用户数序列

     */

    def calculateRampInjection(totalUsers: Int, duration: Int): Seq[Double] = {

      val usersPerSecond = totalUsers.toDouble / duration

      (1 to duration).map(_ => usersPerSecond)

    }

    

    // 实际每秒启动用户数会受系统性能影响

    val expectedRamp = calculateRampInjection(100, 60)

    // 结果: 每秒约启动1.67个用户

  }

}

atOnceUsers-瞬时并发冲击


scala

class AtOnceUsersInjectionSimulation extends Simulation {

  

  val spikeScenario = scenario("Instant User Spike")

    .exec(http("Spike Request")

      .get("/stress-endpoint")

      .check(status.is(200)))

  

  // 基础瞬时并发测试

  setUp(

    spikeScenario.inject(

      atOnceUsers(100) // 立即同时启动100个用户

    )

  ).protocols(httpProtocol)

  

  // 多阶段瞬时冲击

  val multiPhaseSpike = setUp(

    spikeScenario.inject(

      atOnceUsers(50),  // 第一阶段:50用户瞬时冲击

      nothingFor(30),   // 观察30秒

      atOnceUsers(100), // 第二阶段:100用户瞬时冲击  

      nothingFor(30),   // 观察30秒

      atOnceUsers(200)  // 第三阶段:200用户瞬时冲击

    )

  )

  

  // atOnceUsers的技术实现细节

  object AtOnceTechnicalDetails {

    /**

     * atOnceUsers的执行时序

     * 所有用户在同一时刻开始执行,但实际启动时间会有微小差异

     */

    def executeAtOnceUsers(userCount: Int): Unit = {

      // Gatling使用Akka Actor系统并行启动用户

      val userActors = (1 to userCount).map { userId =>

        system.actorOf(UserActor.props, s"user-$userId")

      }

      

      // 所有Actor同时收到启动消息

      userActors.foreach { actor =>

        actor ! StartScenario

      }

    }

  }

  

  // 应用场景:系统容量测试

  val capacityTesting = scenario("System Capacity Test")

    .exec(

      atOnceUsers(500), // 测试系统瞬时处理能力

      atOnceUsers(1000) // 逐步增加找到系统极限

    )

}


constantUsersPerSec-恒定压力负载


scala

class ConstantUsersPerSecSimulation extends Simulation {

  

  val steadyLoadScenario = scenario("Constant Load")

    .exec(http("Steady Request")

      .get("/api")

      .check(status.is(200)))

  

  // 基础恒定负载

  setUp(

    steadyLoadScenario.inject(

      constantUsersPerSec(10).during(300) // 5分钟内保持每秒10用户

    )

  ).protocols(httpProtocol)

  

  // 多级恒定负载

  val multiLevelConstant = setUp(

    steadyLoadScenario.inject(

      constantUsersPerSec(5).during(60),   // 第1分钟:5用户/秒

      constantUsersPerSec(10).during(120), // 第2-3分钟:10用户/秒  

      constantUsersPerSec(15).during(120), // 第4-5分钟:15用户/秒

      constantUsersPerSec(5).during(60)    // 第6分钟:降回5用户/秒

    )

  )

  

  // constantUsersPerSec的调度算法

  object ConstantUsersScheduler {

    /**

     * 恒定用户速率调度算法

     * 使用令牌桶算法保证稳定的用户注入速率

     */

    class ConstantRateScheduler(usersPerSecond: Double, duration: Int) {

      private val tokenBucket = new TokenBucket(usersPerSecond)

      

      def scheduleUsers(): Seq[Double] = {

        val totalUsers = (usersPerSecond * duration).toInt

        val interval = 1.0 / usersPerSecond

        

        (0 until totalUsers).map { userIndex =>

          userIndex * interval

        }

      }

    }

    

    // 实际用户启动时间计算

    val schedule = new ConstantRateScheduler(10, 60)

    val userStartTimes = schedule.scheduleUsers()

    // 用户将在0.0s, 0.1s, 0.2s, ... 59.9s时刻启动

  }

  

  // 高级配置:随机化思考时间

  val realisticConstantLoad = scenario("Realistic Constant Load")

    .during(300) {

      exec(http("API Call")

        .get("/endpoint"))

      .pause(1, 5) // 1-5秒随机思考时间

    }

    .inject(

      constantUsersPerSec(20).during(300) // 考虑思考时间后的实际并发

    )

}


3. 高级注入策略和复合模式

rampUsersPerSec-速率线性递增


scala

class RampUsersPerSecSimulation extends Simulation {

  

  val rampRateScenario = scenario("Ramping Rate Load")

    .exec(http("Ramping Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  // 基础速率递增

  setUp(

    rampRateScenario.inject(

      rampUsersPerSec(1).to(10).during(60) // 1分钟内从1用户/秒增加到10用户/秒

    )

  ).protocols(httpProtocol)

  

  // 多段速率递增

  val multiStageRamp = setUp(

    rampRateScenario.inject(

      rampUsersPerSec(1).to(5).during(30),   // 0-30秒: 1→5用户/秒

      rampUsersPerSec(5).to(20).during(60),  // 30-90秒: 5→20用户/秒

      rampUsersPerSec(20).to(5).during(30)   // 90-120秒: 20→5用户/秒

    )

  )

  

  // rampUsersPerSec的数学建模

  object RampRateMathematics {

    /**

     * 计算线性递增速率曲线

     */

    case class RampRate(from: Double, to: Double, duration: Int) {

      val slope: Double = (to-from) / duration

      

      def rateAtTime(t: Int): Double = {

        require(t >= 0 && t <= duration, s"Time $t out of range [0, $duration]")

        from + slope * t

      }

      

      def totalUsers: Double = {

        // 计算曲线下面积(积分)

        (from + to) * duration / 2.0

      }

    }

    

    val ramp = RampRate(1, 10, 60)

    println(s"总用户数: ${ramp.totalUsers}") // 330用户

    println(s"30秒时的速率: ${ramp.rateAtTime(30)}") // 5.5用户/秒

  }

}


heavisideUsers-S形增长曲线


scala

class HeavisideUsersSimulation extends Simulation {

  

  val smoothScenario = scenario("Smooth Load Increase")

    .exec(http("Smooth Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  // Heaviside函数注入(S形曲线)

  setUp(

    smoothScenario.inject(

      heavisideUsers(1000).during(60) // 60秒内S形曲线增长到1000用户

    )

  ).protocols(httpProtocol)

  

  // Heaviside函数的数学原理

  object HeavisideMathematics {

    /**

     * Heaviside函数的变体-平滑S形曲线

     * 避免线性增长的尖锐拐点,更符合真实用户增长模式

     */

    def smoothHeaviside(t: Double, totalTime: Double): Double = {

      // 使用Sigmoid函数实现平滑过渡

      1.0 / (1.0 + math.exp(-10.0 * (t / totalTime-0.5)))

    }

    

    def calculateHeavisideInjection(totalUsers: Int, duration: Int): Seq[Int] = {

      (0 until duration).map { t =>

        val progress = smoothHeaviside(t, duration)

        (progress * totalUsers).toInt

      }

    }

    

    // 生成用户注入计划

    val injectionPlan = calculateHeavisideInjection(1000, 60)

    // 开始和结束阶段增长缓慢,中间阶段增长迅速

  }

  

  // 应用场景:模拟真实用户登录模式

  val realisticLoginPattern = scenario("Realistic Login Pattern")

    .inject(

      heavisideUsers(500).during(300) // 5分钟内模拟用户陆续登录

    )

}


4. 分层人口模型和用户行为模拟

多用户角色分层


scala

class LayeredPopulationSimulation extends Simulation {

  

  // 定义不同用户角色的行为模式

  val casualUsers = scenario("Casual Users")

    .exec(

      pause(5, 10), // 长时间思考

      http("Browse Action")

        .get("/browse")

        .check(status.is(200)),

      pause(2, 5)

    )

  

  val powerUsers = scenario("Power Users")  

    .exec(

      pause(1, 3), // 短时间思考

      http("Complex Action")

        .get("/complex-operation")

        .check(status.is(200)),

      pause(1, 2)

    )

  

  val adminUsers = scenario("Administrator Users")

    .exec(

      http("Admin Action")

        .get("/admin")

        .check(status.is(200)),

      pause(3, 8)

    )

  

  // 分层人口注入策略

  val layeredInjection = setUp(

    // 普通用户:70%-缓慢增长

    casualUsers.inject(

      rampUsers(70).during(120)

    ),

    

    // 高级用户:25%-中等速度增长  

    powerUsers.inject(

      rampUsers(25).during(90)

    ),

    

    // 管理员:5%-快速启动

    adminUsers.inject(

      atOnceUsers(5)

    )

  ).protocols(httpProtocol)

  

  // 高级分层策略:基于时间的动态比例

  object DynamicPopulationModel {

    

    case class UserSegment(

      name: String,

      scenario: ScenarioBuilder, 

      percentage: Double,

      behavior: String

    )

    

    val segments = Seq(

      UserSegment("Mobile Users", casualUsers, 0.4, "rampUsers(40).during(60)"),

      UserSegment("Desktop Users", powerUsers, 0.5, "rampUsers(50).during(45)"),

      UserSegment("Tablet Users", adminUsers, 0.1, "atOnceUsers(10)")

    )

    

    def createDynamicInjection(): SetUp = {

      val injections = segments.map { segment =>

        segment.scenario.inject(

          rampUsers((segment.percentage * 100).toInt).during(60)

        )

      }

      

      setUp(injections: _*).protocols(httpProtocol)

    }

  }

}


地理分布分层模型


scala

class GeographicDistributionSimulation extends Simulation {

  

  // 模拟不同地理区域的用户行为

  val northAmericaUsers = scenario("North America Users")

    .exec(

      http("NA Request")

        .get("/us-endpoint")

        .header("X-Region", "north-america")

        .check(status.is(200))

    )

  

  val europeUsers = scenario("Europe Users")

    .exec(

      http("EU Request")  

        .get("/eu-endpoint")

        .header("X-Region", "europe")

        .check(status.is(200))

    )

  

  val asiaPacificUsers = scenario("Asia Pacific Users")

    .exec(

      http("APAC Request")

        .get("/apac-endpoint") 

        .header("X-Region", "asia-pacific")

        .check(status.is(200))

    )

  

  // 基于地理时区的分层注入

  val geographicInjection = setUp(

    // 亚洲用户-亚洲工作时间

    asiaPacificUsers.inject(

      rampUsersPerSec(0).to(20).during(10800), // 3小时递增 (08:00-11:00)

      constantUsersPerSec(20).during(14400),   // 4小时稳定 (11:00-15:00)

      rampUsersPerSec(20).to(0).during(10800)  // 3小时递减 (15:00-18:00)

    ),

    

    // 欧洲用户-欧洲工作时间  

    europeUsers.inject(

      rampUsersPerSec(0).to(15).during(10800), // 3小时递增 (08:00-11:00 CET)

      constantUsersPerSec(15).during(14400),   // 4小时稳定 (11:00-15:00 CET)

      rampUsersPerSec(15).to(0).during(10800)  // 3小时递减 (15:00-18:00 CET)

    ),

    

    // 北美用户-北美工作时间

    northAmericaUsers.inject(

      rampUsersPerSec(0).to(25).during(10800), // 3小时递增 (08:00-11:00 EST)

      constantUsersPerSec(25).during(14400),   // 4小时稳定 (11:00-15:00 EST)  

      rampUsersPerSec(25).to(0).during(10800)  // 3小时递减 (15:00-18:00 EST)

    )

  ).protocols(httpProtocol)

}


5. 复合注入策略和高级模式

复合注入


scala

class CompositeInjectionSimulation extends Simulation {

  

  val complexScenario = scenario("Complex Load Pattern")

    .exec(http("Composite Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  // 复杂复合注入策略

  val compositeSetup = setUp(

    complexScenario.inject(

      // 阶段1: 基线负载

      constantUsersPerSec(5).during(60),

      

      // 阶段2: 线性增长

      rampUsersPerSec(5).to(20).during(120),

      

      // 阶段3: 峰值冲击

      atOnceUsers(50),

      nothingFor(10),

      atOnceUsers(50),

      

      // 阶段4: 稳定高压

      constantUsersPerSec(25).during(180),

      

      // 阶段5: 缓慢下降

      rampUsersPerSec(25).to(5).during(120),

      

      // 阶段6: 波动负载

      constantUsersPerSec(5).during(30),

      atOnceUsers(25),

      constantUsersPerSec(5).during(30)

    )

  ).protocols(httpProtocol)

  

  // 基于业务指标的智能注入

  object BusinessDrivenInjection {

    

    case class BusinessHour(hour: Int, expectedUsers: Int)

    

    val businessHours = Seq(

      BusinessHour(9, 100),  // 9AM: 100用户

      BusinessHour(10, 200), // 10AM: 200用户  

      BusinessHour(11, 300), // 11AM: 300用户

      BusinessHour(12, 250), // 12PM: 250用户

      BusinessHour(13, 280), // 1PM: 280用户

      BusinessHour(14, 320), // 2PM: 320用户

      BusinessHour(15, 300), // 3PM: 300用户

      BusinessHour(16, 220), // 4PM: 220用户

      BusinessHour(17, 180)  // 5PM: 180用户

    )

    

    def createBusinessInjection(): Seq[InjectionStep] = {

      businessHours.flatMap { hour =>

        Seq(

          rampUsersPerSec(0).to(hour.expectedUsers / 60.0).during(600), // 10分钟递增

          constantUsersPerSec(hour.expectedUsers / 60.0).during(3000)   // 50分钟稳定

        )

      }

    }

  }

}


自定义注入开发


scala

// 自定义正弦波注入策略

class SineWaveInjection(

  minUsers: Double,

  maxUsers: Double, 

  period: Int,

  duration: Int

) extends InjectionStep {

  

  override def chain(chained: Iterator[FiniteDuration]): Iterator[FiniteDuration] = {

    val amplitude = (maxUsers-minUsers) / 2.0

    val center = minUsers + amplitude

    val angularFrequency = 2.0 * Math.PI / period

    

    val userStream = (0 until duration).iterator.flatMap { second =>

      val rate = center + amplitude * Math.sin(angularFrequency * second)

      val usersThisSecond = Math.max(0, Math.round(rate)).toInt

      

      if (usersThisSecond > 0) {

        val interval = 1.0 / usersThisSecond

        (0 until usersThisSecond).map { userIndex =>

          FiniteDuration((second + userIndex * interval).toLong, TimeUnit.SECONDS)

        }

      } else {

        Seq.empty

      }

    }

    

    userStream

  }

}


// 使用自定义正弦波注入

val sineWaveScenario = scenario("Sine Wave Load")

  .inject(

    new SineWaveInjection(

      minUsers = 10,

      maxUsers = 50, 

      period = 300, // 5分钟周期

      duration = 1800 // 30分钟测试

    )

  )


6. 性能优化

注入性能调优


scala

object InjectionOptimization {

  

  // 基于系统资源的自适应注入

  def adaptiveInjection(maxSystemLoad: Double): InjectionStep = {

    constantUsersPerSec(10).during(60) // 基础负载

      .andThen(

        rampUsersPerSec(10).to(100).during(300) // 渐进增加

          .throttle(

            reachRps(100).in(60),

            holdFor(300) 

          )

      )

  }

  

  // 内存优化的注入策略

  def memoryOptimizedInjection(totalUsers: Int, duration: Int): InjectionStep = {

    // 使用分批次注入避免内存溢出

    val batchSize = 1000

    val batches = (totalUsers + batchSize-1) / batchSize

    

    (0 until batches).foldLeft(nothingFor(0)) { (acc, batch) =>

      val batchUsers = Math.min(batchSize, totalUsers-batch * batchSize)

      acc.andThen(

        rampUsers(batchUsers).during(duration / batches)

      )

    }

  }

}



监控和断言配置


scala

class InjectionMonitoringSimulation extends Simulation {

  

  val monitoredScenario = scenario("Monitored Load Test")

    .exec(http("Monitored Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  setUp(

    monitoredScenario.inject(

      rampUsers(100).during(60)

    )

  ).protocols(httpProtocol)

  .assertions(

    // 注入性能断言

    global.allRequests.percent.is(100),

    global.responseTime.percentile4.lt(800),

    

    // 用户注入速率断言

    details("Monitored Load Test").requestsPerSec.between(1, 2),

    

    // 并发用户数断言  

    details("Monitored Load Test").concurrentUsers.max.lt(110)

  )

  

  // 实时监控配置

  val monitoringConfig = setUp(

    monitoredScenario.inject(

      rampUsersPerSec(1).to(10).during(60)

    )

  ).maxDuration(300)

   .assertions(

     global.successfulRequests.percent.gt(99.5)

   )

}


这种专业的Gatling虚拟用户注入实现了从基础线性增长到复杂分层人口模型,保证了性能测试负载模式的控制和真实用户行为模拟。

文章标签: 测试工具 测评网站并发压力 并发压力测试 压力测试 软件测试
咨询软件测试