Files
kratos-bootstrap/rpc/grpc.go
2024-11-13 11:27:13 +08:00

208 lines
5.3 KiB
Go

package rpc
import (
"context"
"crypto/tls"
"strings"
"time"
"github.com/go-kratos/aegis/ratelimit"
"github.com/go-kratos/aegis/ratelimit/bbr"
"google.golang.org/grpc"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/middleware"
midRateLimit "github.com/go-kratos/kratos/v2/middleware/ratelimit"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/middleware/tracing"
"github.com/go-kratos/kratos/v2/middleware/validate"
kratosGrpc "github.com/go-kratos/kratos/v2/transport/grpc"
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
"github.com/tx7do/kratos-bootstrap/utils"
)
const defaultTimeout = 5 * time.Second
// CreateGrpcClient 创建GRPC客户端
func CreateGrpcClient(ctx context.Context, r registry.Discovery, serviceName string, cfg *conf.Bootstrap, opts ...kratosGrpc.ClientOption) grpc.ClientConnInterface {
var options []kratosGrpc.ClientOption
if opts != nil {
options = append(options, opts...)
}
options = append(options, kratosGrpc.WithDiscovery(r))
var endpoint string
if strings.HasPrefix(serviceName, "discovery:///") {
endpoint = serviceName
} else {
endpoint = "discovery:///" + serviceName
}
options = append(options, kratosGrpc.WithEndpoint(endpoint))
options = append(options, initGrpcClientConfig(cfg)...)
conn, err := kratosGrpc.DialInsecure(ctx, options...)
if err != nil {
log.Fatalf("dial grpc client [%s] failed: %s", serviceName, err.Error())
}
return conn
}
func initGrpcClientConfig(cfg *conf.Bootstrap) []kratosGrpc.ClientOption {
if cfg.Client == nil || cfg.Client.Grpc == nil {
return nil
}
var options []kratosGrpc.ClientOption
timeout := defaultTimeout
if cfg.Client.Grpc.Timeout != nil {
timeout = cfg.Client.Grpc.Timeout.AsDuration()
}
options = append(options, kratosGrpc.WithTimeout(timeout))
if cfg.Client.Grpc.Middleware != nil {
var ms []middleware.Middleware
if cfg.Client.Grpc.Middleware.GetEnableRecovery() {
ms = append(ms, recovery.Recovery())
}
if cfg.Client.Grpc.Middleware.GetEnableTracing() {
ms = append(ms, tracing.Client())
}
if cfg.Client.Grpc.Middleware.GetEnableValidate() {
ms = append(ms, validate.Validator())
}
}
if cfg.Client.Grpc.Tls != nil {
var tlsCfg *tls.Config
var err error
if cfg.Client.Grpc.Tls.File != nil {
if tlsCfg, err = utils.LoadClientTlsConfigFile(
cfg.Client.Grpc.Tls.File.GetKeyPath(),
cfg.Client.Grpc.Tls.File.GetCertPath(),
cfg.Client.Grpc.Tls.File.GetCaPath(),
); err != nil {
panic(err)
}
}
if tlsCfg == nil && cfg.Client.Grpc.Tls.Config != nil {
if tlsCfg, err = utils.LoadClientTlsConfig(
cfg.Client.Grpc.Tls.Config.GetKeyPem(),
cfg.Client.Grpc.Tls.Config.GetCertPem(),
cfg.Client.Grpc.Tls.Config.GetCaPem(),
); err != nil {
panic(err)
}
}
if tlsCfg != nil {
options = append(options, kratosGrpc.WithTLSConfig(tlsCfg))
}
}
return options
}
// CreateGrpcServer 创建GRPC服务端
func CreateGrpcServer(cfg *conf.Bootstrap, opts ...kratosGrpc.ServerOption) *kratosGrpc.Server {
var options []kratosGrpc.ServerOption
if opts != nil {
options = append(options, opts...)
}
options = append(options, initGrpcServerConfig(cfg)...)
srv := kratosGrpc.NewServer(options...)
return srv
}
func initGrpcServerConfig(cfg *conf.Bootstrap) []kratosGrpc.ServerOption {
if cfg.Server == nil || cfg.Server.Grpc == nil {
return nil
}
var options []kratosGrpc.ServerOption
if cfg.Server.Grpc.Middleware != nil {
var ms []middleware.Middleware
if cfg.Server.Grpc.Middleware.GetEnableRecovery() {
ms = append(ms, recovery.Recovery())
}
if cfg.Server.Grpc.Middleware.GetEnableTracing() {
ms = append(ms, tracing.Server())
}
if cfg.Server.Grpc.Middleware.GetEnableValidate() {
ms = append(ms, validate.Validator())
}
if cfg.Server.Grpc.Middleware.GetEnableCircuitBreaker() {
}
if cfg.Server.Grpc.Middleware.Limiter != nil {
var limiter ratelimit.Limiter
switch cfg.Server.Grpc.Middleware.Limiter.GetName() {
case "bbr":
limiter = bbr.NewLimiter()
}
ms = append(ms, midRateLimit.Server(midRateLimit.WithLimiter(limiter)))
}
options = append(options, kratosGrpc.Middleware(ms...))
}
if cfg.Server.Grpc.Tls != nil {
var tlsCfg *tls.Config
var err error
if cfg.Server.Grpc.Tls.File != nil {
if tlsCfg, err = utils.LoadServerTlsConfigFile(
cfg.Server.Grpc.Tls.File.GetKeyPath(),
cfg.Server.Grpc.Tls.File.GetCertPath(),
cfg.Server.Grpc.Tls.File.GetCaPath(),
cfg.Server.Grpc.Tls.InsecureSkipVerify,
); err != nil {
panic(err)
}
}
if tlsCfg == nil && cfg.Server.Grpc.Tls.Config != nil {
if tlsCfg, err = utils.LoadServerTlsConfig(
cfg.Server.Grpc.Tls.Config.GetKeyPem(),
cfg.Server.Grpc.Tls.Config.GetCertPem(),
cfg.Server.Grpc.Tls.Config.GetCaPem(),
cfg.Server.Grpc.Tls.InsecureSkipVerify,
); err != nil {
panic(err)
}
}
if tlsCfg != nil {
options = append(options, kratosGrpc.TLSConfig(tlsCfg))
}
}
if cfg.Server.Grpc.Network != "" {
options = append(options, kratosGrpc.Network(cfg.Server.Grpc.Network))
}
if cfg.Server.Grpc.Addr != "" {
options = append(options, kratosGrpc.Address(cfg.Server.Grpc.Addr))
}
if cfg.Server.Grpc.Timeout != nil {
options = append(options, kratosGrpc.Timeout(cfg.Server.Grpc.Timeout.AsDuration()))
}
return options
}