Files
go-utils/jwtutil/jwt_test.go
2025-05-23 14:31:59 +08:00

520 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package jwtutil
import (
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/stretchr/testify/assert"
_ "github.com/go-kratos/kratos/v2/encoding/json"
_ "github.com/go-kratos/kratos/v2/encoding/proto"
)
func TestParseJWTPayload(t *testing.T) {
var secretKey = []byte("secret")
// 测试有效的 JWT 负载解析
token, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
"exp": 1672728000,
}, secretKey, jwt.SigningMethodHS256)
if err != nil {
t.Fatalf("failed to create token: %v", err)
}
result, err := ParseJWTPayload(token)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, "userId", result["sub"])
assert.Equal(t, float64(1672728000), result["exp"]) // 注意JSON 解码后数字为 float64
// 测试无效的 JWT
invalidToken := "invalid.token.string"
result, err = ParseJWTPayload(invalidToken)
assert.Error(t, err)
assert.Nil(t, result)
// 测试空字符串
result, err = ParseJWTPayload("")
assert.Error(t, err)
assert.Nil(t, result)
}
func TestParseJWTClaimsToStruct(t *testing.T) {
// 测试有效的 JWT 负载解析
type Payload struct {
Sub string `json:"sub"`
Exp int64 `json:"exp"`
}
var secretKey = []byte("secret")
// 验证签名以生成有效的 token
token, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
"exp": 1672728000,
}, secretKey, jwt.SigningMethodHS256)
if err != nil {
t.Fatalf("failed to create token: %v", err)
}
result, err := ParseJWTClaimsToStruct[Payload](token)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, "userId", result.Sub)
assert.Equal(t, int64(1672728000), result.Exp)
// 测试无效的 JWT
invalidToken := "invalid.token.string"
result, err = ParseJWTClaimsToStruct[Payload](invalidToken)
assert.Error(t, err)
assert.Nil(t, result)
// 测试空字符串
result, err = ParseJWTClaimsToStruct[Payload]("")
assert.Error(t, err)
assert.Nil(t, result)
}
func TestVerifyJWT(t *testing.T) {
// 测试有效的 JWT 验证
secretKey := []byte("secret")
token, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
}, secretKey, jwt.SigningMethodHS256)
if err != nil {
t.Fatalf("failed to create token: %v", err)
}
result, err := VerifyJWT(token, secretKey)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, "userId", result["sub"])
// 测试无效的签名
invalidSecretKey := []byte("invalid_secret")
result, err = VerifyJWT(token, invalidSecretKey)
assert.Error(t, err)
assert.Nil(t, result)
// 测试无效的 JWT 格式
invalidToken := "invalid.token.string"
result, err = VerifyJWT(invalidToken, secretKey)
assert.Error(t, err)
assert.Nil(t, result)
// 测试空字符串
result, err = VerifyJWT("", secretKey)
assert.Error(t, err)
assert.Nil(t, result)
}
func TestGenerateGenericJWT(t *testing.T) {
// 定义测试用的 payload 和密钥
type Payload struct {
Sub string `json:"sub"`
Exp int64 `json:"exp"`
}
secretKey := []byte("secret")
payload := Payload{
Sub: "userId",
Exp: 1672728000,
}
// 测试生成有效的 JWT
token, err := GenerateGenericJWT(payload, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, token)
// 验证生成的 JWT 是否正确
parsedPayload, err := ParseJWTClaimsToStruct[Payload](token)
assert.NoError(t, err)
assert.NotNil(t, parsedPayload)
assert.Equal(t, payload.Sub, parsedPayload.Sub)
assert.Equal(t, payload.Exp, parsedPayload.Exp)
// 测试使用空密钥生成 JWT
emptySecretKey := []byte("")
token, err = GenerateGenericJWT(payload, emptySecretKey, jwt.SigningMethodHS256)
assert.Error(t, err)
assert.Empty(t, token)
}
func TestGenerateJWT(t *testing.T) {
// 定义测试用的 payload 和密钥
payload := jwt.MapClaims{
"sub": "userId",
"exp": 1672728000,
}
secretKey := []byte("secret")
// 测试生成有效的 JWT
token, err := GenerateJWT(payload, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, token)
// 验证生成的 JWT 是否正确
parsedPayload, err := ParseJWTPayload(token)
assert.NoError(t, err)
assert.NotNil(t, parsedPayload)
assert.Equal(t, payload["sub"], parsedPayload["sub"])
assert.Equal(t, float64(payload["exp"].(int)), parsedPayload["exp"].(float64)) // 注意JSON 解码后数字为 float64
// 测试使用空密钥生成 JWT
emptySecretKey := []byte("")
token, err = GenerateJWT(payload, emptySecretKey, jwt.SigningMethodHS256)
assert.Error(t, err)
assert.Empty(t, token)
}
func TestRefreshJWT(t *testing.T) {
secretKey := []byte("secret")
// 创建一个初始的 JWT
originalToken, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
"exp": time.Now().Add(1 * time.Hour).Unix(),
}, secretKey, jwt.SigningMethodHS256)
if err != nil {
t.Fatalf("failed to create token: %v", err)
}
// 刷新 JWT设置新的过期时间
newExpiration := time.Now().Add(2 * time.Hour)
refreshedToken, err := RefreshJWT(originalToken, secretKey, newExpiration)
assert.NoError(t, err)
assert.NotEmpty(t, refreshedToken)
// 验证刷新后的 JWT 是否包含新的过期时间
claims, err := ParseJWTPayload(refreshedToken)
assert.NoError(t, err)
assert.NotNil(t, claims)
exp, ok := claims["exp"].(float64)
assert.True(t, ok)
assert.Equal(t, newExpiration.Unix(), int64(exp))
// 测试无效的 JWT
invalidToken := "invalid.token.string"
_, err = RefreshJWT(invalidToken, secretKey, newExpiration)
assert.Error(t, err)
// 测试空字符串
_, err = RefreshJWT("", secretKey, newExpiration)
assert.Error(t, err)
}
func TestGenerateJWTWithHeader(t *testing.T) {
// 定义测试用的 payload、密钥和自定义头部
payload := jwt.MapClaims{
"sub": "userId",
"exp": time.Now().Add(1 * time.Hour).Unix(),
}
secretKey := []byte("secret")
customHeader := map[string]interface{}{
"kid": "key-id-123",
}
// 测试生成有效的 JWT
token, err := GenerateJWTWithHeader(payload, secretKey, jwt.SigningMethodHS256, customHeader)
assert.NoError(t, err)
assert.NotEmpty(t, token)
// 验证生成的 JWT 是否包含自定义头部
parser := jwt.NewParser(jwt.WithoutClaimsValidation())
parsedToken, _, err := parser.ParseUnverified(token, jwt.MapClaims{})
assert.NoError(t, err)
assert.NotNil(t, parsedToken)
// 检查头部是否包含自定义字段
assert.Equal(t, "key-id-123", parsedToken.Header["kid"])
// 测试使用空密钥生成 JWT
emptySecretKey := []byte("")
token, err = GenerateJWTWithHeader(payload, emptySecretKey, jwt.SigningMethodHS256, customHeader)
assert.Error(t, err)
assert.Empty(t, token)
}
func TestExtractJWTFromRequest(t *testing.T) {
// 测试有效的 Authorization Header
req := httptest.NewRequest(http.MethodGet, "/", nil)
req.Header.Set("Authorization", "Bearer valid.jwt.token")
token, err := ExtractJWTFromRequest(req)
assert.NoError(t, err)
assert.Equal(t, "valid.jwt.token", token)
// 测试缺少 Authorization Header
req = httptest.NewRequest(http.MethodGet, "/", nil)
token, err = ExtractJWTFromRequest(req)
assert.Error(t, err)
assert.Equal(t, "authorization header is missing", err.Error())
assert.Empty(t, token)
// 测试无效的 Authorization Header 格式
req = httptest.NewRequest(http.MethodGet, "/", nil)
req.Header.Set("Authorization", "InvalidFormat")
token, err = ExtractJWTFromRequest(req)
assert.Error(t, err)
assert.Equal(t, "invalid authorization header format", err.Error())
assert.Empty(t, token)
// 测试空的 Bearer Token
req = httptest.NewRequest(http.MethodGet, "/", nil)
req.Header.Set("Authorization", "Bearer ")
token, err = ExtractJWTFromRequest(req)
assert.Error(t, err)
assert.Empty(t, token)
}
func TestGenerateShortLivedJWT(t *testing.T) {
secretKey := []byte("secret")
payload := jwt.MapClaims{
"sub": "userId",
}
// 测试生成短期有效的 JWT
duration := 1 * time.Hour
token, err := GenerateShortLivedJWT(payload, secretKey, jwt.SigningMethodHS256, duration)
assert.NoError(t, err)
assert.NotEmpty(t, token)
// 验证生成的 JWT 是否包含正确的过期时间
claims, err := ParseJWTPayload(token)
assert.NoError(t, err)
assert.NotNil(t, claims)
exp, ok := claims["exp"].(float64)
assert.True(t, ok)
expectedExp := time.Now().Add(duration).Unix()
assert.InDelta(t, expectedExp, int64(exp), 5) // 允许 5 秒的时间误差
// 测试空密钥
emptySecretKey := []byte("")
token, err = GenerateShortLivedJWT(payload, emptySecretKey, jwt.SigningMethodHS256, duration)
assert.Error(t, err)
assert.Empty(t, token)
}
func TestValidateJWTAudience(t *testing.T) {
secretKey := []byte("secret")
// 创建一个包含受众的 JWT
token, err := GenerateJWT(jwt.MapClaims{
"aud": []string{"audience1", "audience2"},
}, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, token)
// 测试受众验证成功
valid, err := ValidateJWTAudience(token, "audience1")
assert.NoError(t, err)
assert.True(t, valid)
// 测试受众验证失败
valid, err = ValidateJWTAudience(token, "audience3")
assert.NoError(t, err)
assert.False(t, valid)
// 测试无效的 JWT
invalidToken := "invalid.token.string"
valid, err = ValidateJWTAudience(invalidToken, "audience1")
assert.Error(t, err)
assert.False(t, valid)
// 测试空字符串
valid, err = ValidateJWTAudience("", "audience1")
assert.Error(t, err)
assert.False(t, valid)
}
func TestValidateJWTAlgorithm(t *testing.T) {
secretKey := []byte("secret")
// 测试有效的 JWT 算法验证
token, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
}, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, token)
valid, err := ValidateJWTAlgorithm(token, "HS256")
assert.NoError(t, err)
assert.True(t, valid)
// 测试无效的算法
valid, err = ValidateJWTAlgorithm(token, "RS256")
assert.NoError(t, err)
assert.False(t, valid)
// 测试无效的 JWT
invalidToken := "invalid.token.string"
valid, err = ValidateJWTAlgorithm(invalidToken, "HS256")
assert.Error(t, err)
assert.False(t, valid)
// 测试空字符串
valid, err = ValidateJWTAlgorithm("", "HS256")
assert.Error(t, err)
assert.False(t, valid)
}
func TestIsJWTExpired(t *testing.T) {
secretKey := []byte("secret")
// 测试未过期的 JWT
notExpiredToken, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
"exp": time.Now().Add(1 * time.Hour).Unix(),
}, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, notExpiredToken)
isExpired, err := IsJWTExpired(notExpiredToken)
assert.NoError(t, err)
assert.False(t, isExpired)
// 测试已过期的 JWT
expiredToken, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
"exp": time.Now().Add(-1 * time.Hour).Unix(),
}, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, expiredToken)
isExpired, err = IsJWTExpired(expiredToken)
assert.NoError(t, err)
assert.True(t, isExpired)
// 测试无效的 JWT
invalidToken := "invalid.token.string"
isExpired, err = IsJWTExpired(invalidToken)
assert.Error(t, err)
assert.False(t, isExpired)
// 测试空字符串
isExpired, err = IsJWTExpired("")
assert.Error(t, err)
assert.False(t, isExpired)
}
func TestGetJWTHeader(t *testing.T) {
secretKey := []byte("secret")
// 测试有效的 JWT 头部解析
token, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
}, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, token)
header, err := GetJWTHeader(token)
assert.NoError(t, err)
assert.NotNil(t, header)
assert.Equal(t, "JWT", header["typ"])
assert.Equal(t, "HS256", header["alg"])
// 测试无效的 JWT
invalidToken := "invalid.token.string"
header, err = GetJWTHeader(invalidToken)
assert.Error(t, err)
assert.Nil(t, header)
// 测试空字符串
header, err = GetJWTHeader("")
assert.Error(t, err)
assert.Nil(t, header)
}
func TestValidateJWTIssuer(t *testing.T) {
secretKey := []byte("secret")
// 测试有效的发行者验证
token, err := GenerateJWT(jwt.MapClaims{
"iss": "trusted-issuer",
}, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, token)
valid, err := ValidateJWTIssuer(token, "trusted-issuer")
assert.NoError(t, err)
assert.True(t, valid)
// 测试无效的发行者
valid, err = ValidateJWTIssuer(token, "untrusted-issuer")
assert.NoError(t, err)
assert.False(t, valid)
// 测试无效的 JWT
invalidToken := "invalid.token.string"
valid, err = ValidateJWTIssuer(invalidToken, "trusted-issuer")
assert.Error(t, err)
assert.False(t, valid)
// 测试空字符串
valid, err = ValidateJWTIssuer("", "trusted-issuer")
assert.Error(t, err)
assert.False(t, valid)
}
func TestGetJWTIssuedAt(t *testing.T) {
secretKey := []byte("secret")
// 测试有效的 JWT 签发时间
issuedAt := time.Now().Add(-1 * time.Hour).Unix()
token, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
"iat": issuedAt,
}, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, token)
iat, err := GetJWTIssuedAt(token)
assert.NoError(t, err)
assert.NotNil(t, iat)
assert.Equal(t, issuedAt, iat.Unix())
// 测试无效的 JWT
invalidToken := "invalid.token.string"
iat, err = GetJWTIssuedAt(invalidToken)
assert.Error(t, err)
assert.Nil(t, iat)
// 测试空字符串
iat, err = GetJWTIssuedAt("")
assert.Error(t, err)
assert.Nil(t, iat)
}
func TestGetJWTClaims(t *testing.T) {
secretKey := []byte("secret")
// 测试有效的 JWT
token, err := GenerateJWT(jwt.MapClaims{
"sub": "userId",
"exp": time.Now().Add(1 * time.Hour).Unix(),
}, secretKey, jwt.SigningMethodHS256)
assert.NoError(t, err)
assert.NotEmpty(t, token)
claims, err := GetJWTClaims(token)
assert.NoError(t, err)
assert.NotNil(t, claims)
assert.Equal(t, "userId", claims["sub"])
// 测试无效的 JWT
invalidToken := "invalid.token.string"
claims, err = GetJWTClaims(invalidToken)
assert.Error(t, err)
assert.Nil(t, claims)
// 测试空字符串
claims, err = GetJWTClaims("")
assert.Error(t, err)
assert.Nil(t, claims)
}