feat: registry.

This commit is contained in:
Bobo
2025-06-02 10:33:44 +08:00
parent 45a51d01a7
commit b0e91998e1
24 changed files with 1144 additions and 112 deletions

View File

@@ -1,18 +1,17 @@
package nacos
import (
nacosKratos "github.com/go-kratos/kratos/contrib/registry/nacos/v2"
"github.com/go-kratos/kratos/v2/log"
nacosClients "github.com/nacos-group/nacos-sdk-go/clients"
nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
nacosVo "github.com/nacos-group/nacos-sdk-go/vo"
nacosClients "github.com/nacos-group/nacos-sdk-go/v2/clients"
nacosConstant "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
nacosVo "github.com/nacos-group/nacos-sdk-go/v2/vo"
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
)
// NewRegistry 创建一个注册发现客户端 - Nacos
func NewRegistry(c *conf.Registry) *nacosKratos.Registry {
func NewRegistry(c *conf.Registry) *Registry {
if c == nil || c.Nacos == nil {
return nil
}
@@ -43,7 +42,7 @@ func NewRegistry(c *conf.Registry) *nacosKratos.Registry {
log.Fatal(err)
}
reg := nacosKratos.New(cli)
reg := New(cli)
return reg
}

210
registry/nacos/registry.go Normal file
View File

@@ -0,0 +1,210 @@
package nacos
import (
"context"
"errors"
"fmt"
"math"
"net"
"net/url"
"strconv"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"github.com/go-kratos/kratos/v2/registry"
)
var ErrServiceInstanceNameEmpty = errors.New("kratos/nacos: ServiceInstance.Name can not be empty")
var (
_ registry.Registrar = (*Registry)(nil)
_ registry.Discovery = (*Registry)(nil)
)
type options struct {
prefix string
weight float64
cluster string
group string
kind string
}
// Option is nacos option.
type Option func(o *options)
// WithPrefix with prefix path.
func WithPrefix(prefix string) Option {
return func(o *options) { o.prefix = prefix }
}
// WithWeight with weight option.
func WithWeight(weight float64) Option {
return func(o *options) { o.weight = weight }
}
// WithCluster with cluster option.
func WithCluster(cluster string) Option {
return func(o *options) { o.cluster = cluster }
}
// WithGroup with group option.
func WithGroup(group string) Option {
return func(o *options) { o.group = group }
}
// WithDefaultKind with default kind option.
func WithDefaultKind(kind string) Option {
return func(o *options) { o.kind = kind }
}
// Registry is nacos registry.
type Registry struct {
opts options
cli naming_client.INamingClient
}
// New new a nacos registry.
func New(cli naming_client.INamingClient, opts ...Option) (r *Registry) {
op := options{
prefix: "/microservices",
cluster: "DEFAULT",
group: constant.DEFAULT_GROUP,
weight: 100,
kind: "grpc",
}
for _, option := range opts {
option(&op)
}
return &Registry{
opts: op,
cli: cli,
}
}
// Register the registration.
func (r *Registry) Register(_ context.Context, si *registry.ServiceInstance) error {
if si.Name == "" {
return ErrServiceInstanceNameEmpty
}
for _, endpoint := range si.Endpoints {
u, err := url.Parse(endpoint)
if err != nil {
return err
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
return err
}
p, err := strconv.Atoi(port)
if err != nil {
return err
}
weight := r.opts.weight
var rmd map[string]string
if si.Metadata == nil {
rmd = map[string]string{
"kind": u.Scheme,
"version": si.Version,
}
} else {
rmd = make(map[string]string, len(si.Metadata)+2)
for k, v := range si.Metadata {
rmd[k] = v
}
rmd["kind"] = u.Scheme
rmd["version"] = si.Version
if w, ok := si.Metadata["weight"]; ok {
weight, err = strconv.ParseFloat(w, 64)
if err != nil {
weight = r.opts.weight
}
}
}
_, e := r.cli.RegisterInstance(vo.RegisterInstanceParam{
Ip: host,
Port: uint64(p),
ServiceName: si.Name + "." + u.Scheme,
Weight: weight,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: rmd,
ClusterName: r.opts.cluster,
GroupName: r.opts.group,
})
if e != nil {
return fmt.Errorf("RegisterInstance err %v,%v", e, endpoint)
}
}
return nil
}
// Deregister the registration.
func (r *Registry) Deregister(_ context.Context, service *registry.ServiceInstance) error {
for _, endpoint := range service.Endpoints {
u, err := url.Parse(endpoint)
if err != nil {
return err
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
return err
}
p, err := strconv.Atoi(port)
if err != nil {
return err
}
if _, err = r.cli.DeregisterInstance(vo.DeregisterInstanceParam{
Ip: host,
Port: uint64(p),
ServiceName: service.Name + "." + u.Scheme,
GroupName: r.opts.group,
Cluster: r.opts.cluster,
Ephemeral: true,
}); err != nil {
return err
}
}
return nil
}
// Watch creates a watcher according to the service name.
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
return newWatcher(ctx, r.cli, serviceName, r.opts.group, r.opts.kind, []string{r.opts.cluster})
}
// GetService return the service instances in memory according to the service name.
func (r *Registry) GetService(_ context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
res, err := r.cli.SelectInstances(vo.SelectInstancesParam{
ServiceName: serviceName,
GroupName: r.opts.group,
HealthyOnly: true,
})
if err != nil {
return nil, err
}
items := make([]*registry.ServiceInstance, 0, len(res))
for _, in := range res {
kind := r.opts.kind
weight := r.opts.weight
if k, ok := in.Metadata["kind"]; ok {
kind = k
}
if in.Weight > 0 {
weight = in.Weight
}
r := &registry.ServiceInstance{
ID: in.InstanceId,
Name: in.ServiceName,
Version: in.Metadata["version"],
Metadata: in.Metadata,
Endpoints: []string{fmt.Sprintf("%s://%s:%d", kind, in.Ip, in.Port)},
}
r.Metadata["weight"] = strconv.Itoa(int(math.Ceil(weight)))
items = append(items, r)
}
return items, nil
}

View File

@@ -0,0 +1,566 @@
package nacos
import (
"context"
"reflect"
"testing"
"time"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"github.com/go-kratos/kratos/v2/registry"
)
var testServerConfig = []constant.ServerConfig{
*constant.NewServerConfig("127.0.0.1", 8848),
}
func TestRegistry_Register(t *testing.T) {
sc := testServerConfig
cc := constant.ClientConfig{
NamespaceId: "public", // namespace id
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
//RotateTime: "1h",
//MaxAge: 3,
LogLevel: "debug",
}
// a more graceful way to create naming client
client, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
t.Fatal(err)
}
r := New(client)
testServer := &registry.ServiceInstance{
ID: "1",
Name: "test1",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:8080?isSecure=false"},
}
testServerWithMetadata := &registry.ServiceInstance{
ID: "1",
Name: "test1",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:8080?isSecure=false"},
Metadata: map[string]string{"idc": "shanghai-xs"},
}
type fields struct {
registry *Registry
}
type args struct {
ctx context.Context
service *registry.ServiceInstance
}
tests := []struct {
name string
fields fields
args args
wantErr bool
deferFunc func(t *testing.T)
}{
{
name: "normal",
fields: fields{
registry: New(client),
},
args: args{
ctx: context.Background(),
service: testServer,
},
wantErr: false,
deferFunc: func(t *testing.T) {
err = r.Deregister(context.Background(), testServer)
if err != nil {
t.Error(err)
}
},
},
{
name: "withMetadata",
fields: fields{
registry: New(client),
},
args: args{
ctx: context.Background(),
service: testServerWithMetadata,
},
wantErr: false,
deferFunc: func(t *testing.T) {
err = r.Deregister(context.Background(), testServerWithMetadata)
if err != nil {
t.Error(err)
}
},
},
{
name: "error",
fields: fields{
registry: New(client),
},
args: args{
ctx: context.Background(),
service: &registry.ServiceInstance{
ID: "1",
Name: "",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:8080?isSecure=false"},
},
},
wantErr: true,
},
{
name: "urlError",
fields: fields{
registry: New(client),
},
args: args{
ctx: context.Background(),
service: &registry.ServiceInstance{
ID: "1",
Name: "test",
Version: "v1.0.0",
Endpoints: []string{"127.0.0.1:8080"},
},
},
wantErr: true,
},
{
name: "portError",
fields: fields{
registry: New(client),
},
args: args{
ctx: context.Background(),
service: &registry.ServiceInstance{
ID: "1",
Name: "test",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1888"},
},
},
wantErr: true,
},
{
name: "withCluster",
fields: fields{
registry: New(client, WithCluster("test")),
},
args: args{
ctx: context.Background(),
service: testServer,
},
wantErr: false,
},
{
name: "withGroup",
fields: fields{
registry: New(client, WithGroup("TEST_GROUP")),
},
args: args{
ctx: context.Background(),
service: testServer,
},
wantErr: false,
},
{
name: "withWeight",
fields: fields{
registry: New(client, WithWeight(200)),
},
args: args{
ctx: context.Background(),
service: testServer,
},
wantErr: false,
},
{
name: "withPrefix",
fields: fields{
registry: New(client, WithPrefix("test")),
},
args: args{
ctx: context.Background(),
service: testServer,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := tt.fields.registry
if err := r.Register(tt.args.ctx, tt.args.service); (err != nil) != tt.wantErr {
t.Errorf("Register error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestRegistry_Deregister(t *testing.T) {
testServer := &registry.ServiceInstance{
ID: "1",
Name: "test2",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:8080?isSecure=false"},
}
type args struct {
ctx context.Context
service *registry.ServiceInstance
}
tests := []struct {
name string
args args
wantErr bool
preFunc func(t *testing.T)
}{
{
name: "normal",
args: args{
ctx: context.Background(),
service: testServer,
},
wantErr: false,
preFunc: func(t *testing.T) {
sc := testServerConfig
cc := constant.ClientConfig{
NamespaceId: "public", // namespace id
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
//RotateTime: "1h",
//MaxAge: 3,
LogLevel: "debug",
}
// a more graceful way to create naming client
client, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
t.Fatal(err)
}
r := New(client)
err = r.Register(context.Background(), testServer)
if err != nil {
t.Error(err)
}
},
},
{
name: "error",
args: args{
ctx: context.Background(),
service: &registry.ServiceInstance{
ID: "1",
Name: "test",
Version: "v1.0.0",
Endpoints: []string{"127.0.0.1:8080"},
},
},
wantErr: true,
},
{
name: "errorPort",
args: args{
ctx: context.Background(),
service: &registry.ServiceInstance{
ID: "1",
Name: "notExist",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.18080"},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sc := testServerConfig
cc := constant.ClientConfig{
NamespaceId: "public", // namespace id
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
//RotateTime: "1h",
//MaxAge: 3,
LogLevel: "debug",
}
// a more graceful way to create naming client
client, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
t.Fatal(err)
}
r := New(client)
if tt.preFunc != nil {
tt.preFunc(t)
}
if err := r.Deregister(tt.args.ctx, tt.args.service); (err != nil) != tt.wantErr {
t.Errorf("Deregister error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestRegistry_GetService(t *testing.T) {
sc := testServerConfig
cc := constant.ClientConfig{
NamespaceId: "public", // namespace id
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
//RotateTime: "1h",
//MaxAge: 3,
LogLevel: "debug",
}
// a more graceful way to create naming client
client, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
t.Fatal(err)
}
r := New(client)
testServer := &registry.ServiceInstance{
ID: "1",
Name: "test3",
Version: "v1.0.0",
Endpoints: []string{"grpc://127.0.0.1:8080?isSecure=false"},
}
type fields struct {
registry *Registry
}
type args struct {
ctx context.Context
serviceName string
}
tests := []struct {
name string
fields fields
args args
want []*registry.ServiceInstance
wantErr bool
preFunc func(t *testing.T)
deferFunc func(t *testing.T)
}{
{
name: "normal",
preFunc: func(t *testing.T) {
err = r.Register(context.Background(), testServer)
if err != nil {
t.Error(err)
}
time.Sleep(time.Second * 3)
},
deferFunc: func(t *testing.T) {
err = r.Deregister(context.Background(), testServer)
if err != nil {
t.Error(err)
}
},
fields: fields{
registry: r,
},
args: args{
ctx: context.Background(),
serviceName: testServer.Name + "." + "grpc",
},
want: []*registry.ServiceInstance{{
ID: "127.0.0.1#8080#DEFAULT#DEFAULT_GROUP@@test3.grpc",
Name: "DEFAULT_GROUP@@test3.grpc",
Version: "v1.0.0",
Metadata: map[string]string{"version": "v1.0.0", "kind": "grpc", "weight": "100"},
Endpoints: []string{"grpc://127.0.0.1:8080"},
}},
wantErr: false,
},
{
name: "errorNotExist",
fields: fields{
registry: r,
},
args: args{
ctx: context.Background(),
serviceName: "notExist",
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.preFunc != nil {
tt.preFunc(t)
}
if tt.deferFunc != nil {
defer tt.deferFunc(t)
}
r := tt.fields.registry
got, err := r.GetService(tt.args.ctx, tt.args.serviceName)
if (err != nil) != tt.wantErr {
t.Errorf("GetService error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService got = %v", got)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetService got = %v, want %v", got, tt.want)
}
})
}
}
func TestRegistry_Watch(t *testing.T) {
sc := testServerConfig
cc := constant.ClientConfig{
NamespaceId: "public", // namespace id
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
//RotateTime: "1h",
//MaxAge: 3,
LogLevel: "debug",
}
// a more graceful way to create naming client
client, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
t.Fatal(err)
}
r := New(client)
testServer := &registry.ServiceInstance{
ID: "1",
Name: "test4",
Version: "v1.0.0",
Endpoints: []string{"grpc://127.0.0.1:8080?isSecure=false"},
}
cancelCtx, cancel := context.WithCancel(context.Background())
type fields struct {
registry *Registry
}
type args struct {
ctx context.Context
serviceName string
}
tests := []struct {
name string
fields fields
args args
wantErr bool
want []*registry.ServiceInstance
processFunc func(t *testing.T)
}{
{
name: "normal",
fields: fields{
registry: New(client),
},
args: args{
ctx: context.Background(),
serviceName: testServer.Name + "." + "grpc",
},
wantErr: false,
want: []*registry.ServiceInstance{{
ID: "127.0.0.1#8080#DEFAULT#DEFAULT_GROUP@@test4.grpc",
Name: "DEFAULT_GROUP@@test4.grpc",
Version: "v1.0.0",
Metadata: map[string]string{"version": "v1.0.0", "kind": "grpc"},
Endpoints: []string{"grpc://127.0.0.1:8080"},
}},
processFunc: func(t *testing.T) {
err = r.Register(context.Background(), testServer)
if err != nil {
t.Error(err)
}
},
},
{
name: "ctxCancel",
fields: fields{
registry: r,
},
args: args{
ctx: cancelCtx,
serviceName: testServer.Name,
},
wantErr: true,
want: nil,
processFunc: func(*testing.T) {
cancel()
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := tt.fields.registry
watch, err := r.Watch(tt.args.ctx, tt.args.serviceName)
if err != nil {
t.Error(err)
return
}
defer func() {
err = watch.Stop()
if err != nil {
t.Error(err)
}
}()
_, err = watch.Next()
if err != nil {
t.Error(err)
return
}
if tt.processFunc != nil {
tt.processFunc(t)
}
want, err := watch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("Watch error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(want, tt.want) {
t.Errorf("Watch watcher = %v, want %v", watch, tt.want)
}
})
}
}

93
registry/nacos/watcher.go Normal file
View File

@@ -0,0 +1,93 @@
package nacos
import (
"context"
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"github.com/go-kratos/kratos/v2/registry"
)
var _ registry.Watcher = (*watcher)(nil)
type watcher struct {
serviceName string
clusters []string
groupName string
ctx context.Context
cancel context.CancelFunc
watchChan chan struct{}
cli naming_client.INamingClient
kind string
subscribeParam *vo.SubscribeParam
}
func newWatcher(ctx context.Context, cli naming_client.INamingClient, serviceName, groupName, kind string, clusters []string) (*watcher, error) {
w := &watcher{
serviceName: serviceName,
clusters: clusters,
groupName: groupName,
cli: cli,
kind: kind,
watchChan: make(chan struct{}, 1),
}
w.ctx, w.cancel = context.WithCancel(ctx)
w.subscribeParam = &vo.SubscribeParam{
ServiceName: serviceName,
Clusters: clusters,
GroupName: groupName,
SubscribeCallback: func([]model.Instance, error) {
select {
case w.watchChan <- struct{}{}:
default:
}
},
}
e := w.cli.Subscribe(w.subscribeParam)
select {
case w.watchChan <- struct{}{}:
default:
}
return w, e
}
func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
select {
case <-w.ctx.Done():
return nil, w.ctx.Err()
case <-w.watchChan:
}
res, err := w.cli.GetService(vo.GetServiceParam{
ServiceName: w.serviceName,
GroupName: w.groupName,
Clusters: w.clusters,
})
if err != nil {
return nil, err
}
items := make([]*registry.ServiceInstance, 0, len(res.Hosts))
for _, in := range res.Hosts {
kind := w.kind
if k, ok := in.Metadata["kind"]; ok {
kind = k
}
items = append(items, &registry.ServiceInstance{
ID: in.InstanceId,
Name: res.Name,
Version: in.Metadata["version"],
Metadata: in.Metadata,
Endpoints: []string{fmt.Sprintf("%s://%s:%d", kind, in.Ip, in.Port)},
})
}
return items, nil
}
func (w *watcher) Stop() error {
err := w.cli.Unsubscribe(w.subscribeParam)
w.cancel()
return err
}