当前位置: 首页 > 测试知识 > 第三方软件测试报告书使用Gatling进行HTTP认证测试:Basic、Digest、OAuth 2.0与JWT令牌流程模拟
第三方软件测试报告书使用Gatling进行HTTP认证测试:Basic、Digest、OAuth 2.0与JWT令牌流程模拟
2025-11-24 作者cwb 浏览次数52

使用Gatling进行HTTP认证测试:Basic、Digest、OAuth 2.0与JWT令牌流程模拟

1. Gatling认证测试架构设计

认证测试的主要组件

Gatling的认证测试架构建立在异步非阻塞I/O模型之上,通过Session状态管理和协议配置层实现复杂的认证流程。主要组件包括:

HTTP协议配置器:管理基础认证参数和全局HTTP头

Session状态机:维护用户认证状态和令牌生命周期

检查点验证器:验证认证响应和令牌有效性

Feeders数据源:提供动态认证凭据和测试数据


认证流程的状态管理


scala

// Session状态键定义

object AuthSessionKeys {

  val ACCESS_TOKEN = "accessToken"

  val REFRESH_TOKEN = "refreshToken" 

  val TOKEN_EXPIRY = "tokenExpiry"

  val AUTH_STATE = "authState"

  val CSRF_TOKEN = "csrfToken"

}


2. Basic认证深度实现

基础Basic认证配置


scala

import io.gatling.core.Predef._

import io.gatling.http.Predef._


class BasicAuthSimulation extends Simulation {

  

  // Basic认证HTTP协议配置

  val httpProtocol = http

    .baseUrl("https://api.example.com")

    .basicAuth("username", "password")

    .acceptHeader("application/json")

    .contentTypeHeader("application/json")

    

  // 动态Basic认证场景

  val basicAuthScenario = scenario("Dynamic Basic Authentication")

    .feed(credentialsFeeder) // 从数据源获取凭据

    .exec(http("Basic Auth Request")

      .get("/secure-endpoint")

      .basicAuth("${username}", "${password}")

      .check(status.is(200))

      .check(jsonPath("$.authenticated").is("true")))

}



高级Basic认证策略


scala

// 多租户Basic认证测试

val multiTenantBasicAuth = scenario("Multi-tenant Basic Auth")

  .feed(tenantCredentialsFeeder)

  .exec(session => {

    val tenantId = session("tenantId").as[String]

    val credentials = s"${session("username").as[String]}:${session("password").as[String]}"

    val encodedCredentials = java.util.Base64.getEncoder.encodeToString(credentials.getBytes)

    session.set("authHeader", s"Basic $encodedCredentials")

  })

  .exec(http("Tenant-specific Auth")

    .get("/${tenantId}/resources")

    .header("Authorization", "${authHeader}")

    .check(header("X-Tenant-Id").is("${tenantId}")))


3.Digest认证详细实现

Digest认证流程自动化


scala

class DigestAuthSimulation extends Simulation {

  

  // Digest认证专用配置

  val digestProtocol = http

    .baseUrl("https://digest.example.com")

    .digestAuth("username", "password")

    .disableWarmUp // Digest认证需要禁用预热以避免nonce冲突

    

  val digestScenario = scenario("Digest Authentication Flow")

    .exec(http("Initial Challenge Request")

      .get("/protected")

      .check(status.is(401))

      .check(header("WWW-Authenticate").saveAs("authChallenge")))

    .exec(session => {

      // 解析Digest挑战参数

      val challenge = session("authChallenge").as[String]

      val realm = extractDigestParam(challenge, "realm")

      val nonce = extractDigestParam(challenge, "nonce")

      // 计算Digest响应

      val digestResponse = calculateDigestResponse(

        session("username").as[String],

        session("password").as[String],

        realm, nonce, "GET", "/protected"

      )

      session.set("digestResponse", digestResponse)

    })

    .exec(http("Authenticated Request")

      .get("/protected")

      .header("Authorization", "${digestResponse}")

      .check(status.is(200)))

      

  private def extractDigestParam(challenge: String, param: String): String = {

    val pattern = s"""$param=\"([^\"]+)\"""".r

    pattern.findFirstMatchIn(challenge).map(_.group(1)).getOrElse("")

  }

}


4. OAuth 2.0完整流程模拟

OAuth 2.0授权码流程


scala

class OAuth2AuthorizationCodeSimulation extends Simulation {

  

  val oauth2Scenario = scenario("OAuth 2.0 Authorization Code Flow")

    .exec(initiateAuthorization)

    .exec(handleAuthorizationCallback)

    .exec(exchangeCodeForToken)

    .exec(refreshAccessToken)

    .exec(apiCallsWithToken)

  

  // 步骤1: 初始化授权请求

  private def initiateAuthorization = 

    http("Initiate OAuth Authorization")

      .get("${authorizationEndpoint}")

      .queryParam("response_type", "code")

      .queryParam("client_id", "${clientId}")

      .queryParam("redirect_uri", "${redirectUri}")

      .queryParam("scope", "${scope}")

      .queryParam("state", "${state}")

      .check(status.is(302))

      .check(header("Location").saveAs("redirectLocation"))

  

  // 步骤2: 处理授权回调(模拟用户同意)

  private def handleAuthorizationCallback = 

    exec(session => {

      // 从重定向URL中提取授权码

      val location = session("redirectLocation").as[String]

      val code = extractAuthorizationCode(location)

      session.set("authorizationCode", code)

    })

  

  // 步骤3: 交换令牌

  private def exchangeCodeForToken = 

    http("Exchange Code for Token")

      .post("${tokenEndpoint}")

      .formParam("grant_type", "authorization_code")

      .formParam("code", "${authorizationCode}")

      .formParam("redirect_uri", "${redirectUri}")

      .formParam("client_id", "${clientId}")

      .formParam("client_secret", "${clientSecret}")

      .check(status.is(200))

      .check(jsonPath("$.access_token").saveAs("accessToken"))

      .check(jsonPath("$.refresh_token").saveAs("refreshToken"))

      .check(jsonPath("$.expires_in").saveAs("tokenExpiry"))

  

  // 步骤4: 令牌刷新

  private def refreshAccessToken = 

    http("Refresh Access Token")

      .post("${tokenEndpoint}")

      .formParam("grant_type", "refresh_token")

      .formParam("refresh_token", "${refreshToken}")

      .formParam("client_id", "${clientId}")

      .formParam("client_secret", "${clientSecret}")

      .check(status.is(200))

      .check(jsonPath("$.access_token").saveAs("newAccessToken"))

  

  // 步骤5: 使用令牌调用API

  private def apiCallsWithToken = 

    http("API Call with Bearer Token")

      .get("${apiEndpoint}")

      .header("Authorization", "Bearer ${newAccessToken}")

      .check(status.is(200))

      .check(jsonPath("$.user.id").is("${userId}"))

}


OAuth 2.0客户端凭证流程


scala

class OAuth2ClientCredentialsSimulation extends Simulation {

  

  val clientCredentialsScenario = scenario("OAuth 2.0 Client Credentials Flow")

    .feed(clientCredentialsFeeder)

    .exec(acquireClientCredentialsToken)

    .during(300 seconds) {

      exec(makeAuthenticatedApiCall)

        .pause(1 second, 5 seconds)

    }

  

  private def acquireClientCredentialsToken = 

    http("Get Client Credentials Token")

      .post("${tokenEndpoint}")

      .formParam("grant_type", "client_credentials")

      .formParam("client_id", "${clientId}")

      .formParam("client_secret", "${clientSecret}")

      .formParam("scope", "${scope}")

      .check(status.is(200))

      .check(jsonPath("$.access_token").saveAs("accessToken"))

      .check(jsonPath("$.expires_in").saveAs("expiresIn"))

      .exec(session => {

        val expiryTime = System.currentTimeMillis() + session("expiresIn").as[Long] * 1000

        session.set("tokenExpiry", expiryTime)

      })

  

  private def makeAuthenticatedApiCall = 

    doIf(session => System.currentTimeMillis() > session("tokenExpiry").as[Long]) {

      exec(acquireClientCredentialsToken)

    }.exec(

      http("Service-to-Service API Call")

        .get("${apiBaseUrl}/resources")

        .header("Authorization", "Bearer ${accessToken}")

        .check(status.in(200, 204))

    )

}


5. JWT令牌流程高级实现

JWT生成和验证流程


scala

class JWTFlowSimulation extends Simulation {

  

  val jwtScenario = scenario("JWT Authentication Flow")

    .exec(loginAndAcquireJWT)

    .exec(verifyJWTStructure)

    .exec(makeJWTProtectedCalls)

    .exec(handleJWTExpiration)

  

  // JWT登录获取令牌

  private def loginAndAcquireJWT = 

    http("Login to Get JWT")

      .post("${authEndpoint}/login")

      .body(StringBody("""{"username":"${username}","password":"${password}"}"""))

      .check(status.is(200))

      .check(jsonPath("$.token").saveAs("jwtToken"))

      .check(header("X-Refresh-Token").optional.saveAs("refreshToken"))

  

  // JWT结构验证

  private def verifyJWTStructure = 

    exec(session => {

      val jwtToken = session("jwtToken").as[String]

      try {

        // 解析JWT载荷(不验证签名,仅用于测试)

        val parts = jwtToken.split("\\.")

        val payload = new String(java.util.Base64.getUrlDecoder.decode(parts(1)))

        val claims = ujson.read(payload)

        

        session

          .set("jwtExpiry", claims("exp").num.toLong)

          .set("jwtSubject", claims("sub").str)

          .set("jwtIssuer", claims("iss").str)

      } catch {

        case e: Exception => session.markAsFailed

      }

    })

  

  // JWT保护API调用

  private def makeJWTProtectedCalls = 

    during(10 minutes) {

      exec(session => {

        val currentTime = System.currentTimeMillis() / 1000

        val tokenExpiry = session("jwtExpiry").as[Long]

        // 在令牌过期前30秒触发刷新

        if (tokenExpiry - currentTime < 30) {

          session.set("shouldRefresh", true)

        } else {

          session.set("shouldRefresh", false)

        }

      })

      .doIf("${shouldRefresh}") {

        exec(refreshJWTToken)

      }

      .exec(

        http("JWT Protected API")

          .get("${apiBaseUrl}/user/${jwtSubject}/profile")

          .header("Authorization", "Bearer ${jwtToken}")

          .check(status.is(200))

          .check(jsonPath("$.user.id").is("${jwtSubject}"))

      )

      .pause(5 seconds, 15 seconds)

    }

  

  // JWT令牌刷新

  private def refreshJWTToken = 

    http("Refresh JWT Token")

      .post("${authEndpoint}/refresh")

      .header("Authorization", "Bearer ${jwtToken}")

      .body(StringBody("""{"refresh_token":"${refreshToken}"}"""))

      .check(status.is(200))

      .check(jsonPath("$.token").saveAs("jwtToken"))

}


自定义JWT生成器(用于测试)


scala

object JWTGenerator {

  

  def generateTestJWT(username: String, roles: List[String], expiryMinutes: Int = 60): String = {

    val header = """{"alg":"HS256","typ":"JWT"}"""

    val payload = ujson.Obj(

      "sub" -> username,

      "iss" -> "test-issuer",

      "exp" -> (System.currentTimeMillis() / 1000 + expiryMinutes * 60),

      "iat" -> (System.currentTimeMillis() / 1000),

      "roles" -> roles

    )

    

    val encodedHeader = java.util.Base64.getUrlEncoder.encodeToString(header.getBytes)

    val encodedPayload = java.util.Base64.getUrlEncoder.encodeToString(payload.toString().getBytes)

    

    s"$encodedHeader.$encodedPayload.signature" // 注意:这是测试用的无效签名

  }

  

  // 在场景中使用自定义JWT

  val customJwtScenario = scenario("Custom JWT Testing")

    .exec(session => {

      val customJWT = generateTestJWT(

        session("username").as[String],

        List("user", "admin")

      )

      session.set("customJWT", customJWT)

    })

    .exec(

      http("API with Custom JWT")

        .get("${apiBaseUrl}/admin/resources")

        .header("Authorization", "Bearer ${customJWT}")

        .check(status.in(200, 403)) // 可能因为无效签名返回403

    )

}


6. 混合认证策略和性能测试

多认证类型混合负载测试


scala

class MixedAuthLoadTest extends Simulation {

  

  val httpProtocol = http

    .baseUrl("https://api.example.com")

    .acceptEncodingHeader("gzip, deflate")

    .userAgentHeader("Gatling-Auth-Test/1.0")

  

  // 定义不同认证类型的用户比例

  val userWeights = Map(

    "basic" -> 30,    // 30% Basic认证用户

    "digest" -> 20,   // 20% Digest认证用户  

    "oauth2" -> 40,   // 40% OAuth2用户

    "jwt" -> 10       // 10% JWT用户

  )

  

  def apply(): SetUp = {

    val basicUsers = scenario("Basic Auth Users")

      .exec(BasicAuthSimulation.basicAuthScenario)

    

    val digestUsers = scenario("Digest Auth Users")  

      .exec(DigestAuthSimulation.digestScenario)

    

    val oauth2Users = scenario("OAuth2 Users")

      .exec(OAuth2AuthorizationCodeSimulation.oauth2Scenario)

    

    val jwtUsers = scenario("JWT Users")

      .exec(JWTFlowSimulation.jwtScenario)

    

    setUp(

      basicUsers.inject(rampUsers(userWeights("basic")).during(60)),

      digestUsers.inject(rampUsers(userWeights("digest")).during(60)),

      oauth2Users.inject(rampUsers(userWeights("oauth2")).during(60)),

      jwtUsers.inject(rampUsers(userWeights("jwt")).during(60))

    ).protocols(httpProtocol)

  }

}


认证性能指标和断言


scala

class AuthPerformanceAssertions {

  

  val globalAssertions = Seq(

    // 全局性能断言

    global.responseTime.percentile4.lt(1000), // 99%响应时间 < 1秒

    global.failedRequests.percent.lt(1.0),    // 失败率 < 1%

    global.successfulRequests.percent.gt(99)  // 成功率 > 99%

  )

  

  val authSpecificAssertions = Seq(

    // 认证特定断言

    details("Exchange Code for Token").responseTime.percentile4.lt(2000),

    details("Refresh Access Token").responseTime.max.lt(3000),

    details("Login to Get JWT").failedRequests.percent.lt(0.5),

    // 认证令牌有效性验证

    forAll.responseTime.max.lt(5000),

    

    // 并发用户下的认证性能

    details("API Call with Bearer Token").requestsPerSec.gt(50)

  )

}


7. 高级配置和优化

认证测试环境配置


scala

object AuthTestConfig {

  

  // 环境特定配置

  val environments = Map(

    "dev" -> Map(

      "baseUrl" -> "https://dev-api.example.com",

      "authEndpoint" -> "https://dev-auth.example.com",

      "tokenEndpoint" -> "https://dev-auth.example.com/oauth/token"

    ),

    "staging" -> Map(

      "baseUrl" -> "https://staging-api.example.com", 

      "authEndpoint" -> "https://staging-auth.example.com",

      "tokenEndpoint" -> "https://staging-auth.example.com/oauth/token"

    ),

    "prod" -> Map(

      "baseUrl" -> "https://api.example.com",

      "authEndpoint" -> "https://auth.example.com",

      "tokenEndpoint" -> "https://auth.example.com/oauth/token"

    )

  )

  

  def getConfig(env: String): Map[String, String] = {

    environments.getOrElse(env, environments("dev"))

  }

  

  // 认证超时配置

  val authTimeouts = http

    .requestTimeout(30000)

    .connectTimeout(10000)

    .maxConnectionsPerHost(100)

}

这种专业的Gatling认证测试实现提供了完整的HTTP认证覆盖,从简单的Basic认证到复杂的OAuth 2.0和JWT流程,保证了在各种认证场景下的性能测试准确性和可靠性。

文章标签: 第三方软件测评报告 第三方软件测试报告 第三方软件测试 第三方软件测试公司 第三方软件测试机构
咨询软件测试