feat: registry.
This commit is contained in:
423
registry/kubernetes/registry.go
Normal file
423
registry/kubernetes/registry.go
Normal file
@@ -0,0 +1,423 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
listerv1 "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/registry"
|
||||
)
|
||||
|
||||
// Defines the key name of specific fields
|
||||
// Kratos needs to cooperate with the following fields to run properly on Kubernetes:
|
||||
// kratos-service-id: define the ID of the service
|
||||
// kratos-service-app: define the name of the service
|
||||
// kratos-service-version: define the version of the service
|
||||
// kratos-service-metadata: define the metadata of the service
|
||||
// kratos-service-protocols: define the protocols of the service
|
||||
//
|
||||
// Example Deployment:
|
||||
/*
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: nginx
|
||||
labels:
|
||||
app: nginx
|
||||
spec:
|
||||
replicas: 5
|
||||
selector:
|
||||
matchLabels:
|
||||
app: nginx
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: nginx
|
||||
kratos-service-id: "56991810-c77f-4a95-8190-393efa9c1a61"
|
||||
kratos-service-app: "nginx"
|
||||
kratos-service-version: "v3.5.0"
|
||||
annotations:
|
||||
kratos-service-protocols: |
|
||||
{"80": "http"}
|
||||
kratos-service-metadata: |
|
||||
{"region": "sh", "zone": "sh001", "cluster": "pd"}
|
||||
spec:
|
||||
containers:
|
||||
- name: nginx
|
||||
image: nginx:1.7.9
|
||||
ports:
|
||||
- containerPort: 80
|
||||
*/
|
||||
const (
|
||||
// LabelsKeyServiceID is used to define the ID of the service
|
||||
LabelsKeyServiceID = "kratos-service-id"
|
||||
// LabelsKeyServiceName is used to define the name of the service
|
||||
LabelsKeyServiceName = "kratos-service-app"
|
||||
// LabelsKeyServiceVersion is used to define the version of the service
|
||||
LabelsKeyServiceVersion = "kratos-service-version"
|
||||
// AnnotationsKeyMetadata is used to define the metadata of the service
|
||||
AnnotationsKeyMetadata = "kratos-service-metadata"
|
||||
// AnnotationsKeyProtocolMap is used to define the protocols of the service
|
||||
// Through the value of this field; Kratos can obtain the application layer protocol corresponding to the port
|
||||
// Example value: {"80": "http", "8081": "grpc"}
|
||||
AnnotationsKeyProtocolMap = "kratos-service-protocols"
|
||||
)
|
||||
|
||||
// The Registry simply implements service discovery based on Kubernetes
|
||||
// It has not been verified in the production environment and is currently for reference only
|
||||
type Registry struct {
|
||||
clientSet *kubernetes.Clientset
|
||||
informerFactory informers.SharedInformerFactory
|
||||
podInformer cache.SharedIndexInformer
|
||||
podLister listerv1.PodLister
|
||||
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// New is used to initialize the Registry
|
||||
func New(clientSet *kubernetes.Clientset, namespace string) *Registry {
|
||||
if strings.EqualFold(namespace, "") {
|
||||
namespace = metav1.NamespaceAll
|
||||
}
|
||||
informerFactory := informers.NewSharedInformerFactoryWithOptions(clientSet, time.Minute*10, informers.WithNamespace(namespace))
|
||||
podInformer := informerFactory.Core().V1().Pods().Informer()
|
||||
podLister := informerFactory.Core().V1().Pods().Lister()
|
||||
return &Registry{
|
||||
clientSet: clientSet,
|
||||
informerFactory: informerFactory,
|
||||
podInformer: podInformer,
|
||||
podLister: podLister,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Register is used to register services
|
||||
// Note that on Kubernetes, it can only be used to update the id/name/version/metadata/protocols of the current service,
|
||||
// but it cannot be used to update node.
|
||||
func (s *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error {
|
||||
// GetMetadata
|
||||
metadataVal, err := marshal(service.Metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Generate ProtocolMap
|
||||
protocolMap, err := getProtocolMapByEndpoints(service.Endpoints)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
protocolMapVal, err := marshal(protocolMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
patchBytes, err := jsoniter.Marshal(map[string]any{
|
||||
"metadata": metav1.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
LabelsKeyServiceID: service.ID,
|
||||
LabelsKeyServiceName: service.Name,
|
||||
LabelsKeyServiceVersion: service.Version,
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
AnnotationsKeyMetadata: metadataVal,
|
||||
AnnotationsKeyProtocolMap: protocolMapVal,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = s.clientSet.
|
||||
CoreV1().
|
||||
Pods(GetNamespace()).
|
||||
Patch(ctx, GetPodName(), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Deregister the registration.
|
||||
func (s *Registry) Deregister(ctx context.Context, _ *registry.ServiceInstance) error {
|
||||
return s.Register(ctx, ®istry.ServiceInstance{
|
||||
Metadata: map[string]string{},
|
||||
})
|
||||
}
|
||||
|
||||
// GetService return the service instances in memory according to the service name.
|
||||
func (s *Registry) GetService(_ context.Context, name string) ([]*registry.ServiceInstance, error) {
|
||||
pods, err := s.podLister.List(labels.SelectorFromSet(map[string]string{
|
||||
LabelsKeyServiceName: name,
|
||||
}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := make([]*registry.ServiceInstance, 0, len(pods))
|
||||
for _, pod := range pods {
|
||||
if pod.Status.Phase != corev1.PodRunning {
|
||||
continue
|
||||
}
|
||||
instance, err := getServiceInstanceFromPod(pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret = append(ret, instance)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (s *Registry) sendLatestInstances(ctx context.Context, name string, announcement chan []*registry.ServiceInstance) {
|
||||
instances, err := s.GetService(ctx, name)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
announcement <- instances
|
||||
}
|
||||
|
||||
// Watch creates a watcher according to the service name.
|
||||
func (s *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
|
||||
stopCh := make(chan struct{}, 1)
|
||||
announcement := make(chan []*registry.ServiceInstance, 1)
|
||||
_, _ = s.podInformer.AddEventHandler(cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj any) bool {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return false
|
||||
case <-s.stopCh:
|
||||
return false
|
||||
default:
|
||||
pod := obj.(*corev1.Pod)
|
||||
val := pod.GetLabels()[LabelsKeyServiceName]
|
||||
return val == name
|
||||
}
|
||||
},
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(any) {
|
||||
s.sendLatestInstances(ctx, name, announcement)
|
||||
},
|
||||
UpdateFunc: func(any, any) {
|
||||
s.sendLatestInstances(ctx, name, announcement)
|
||||
},
|
||||
DeleteFunc: func(any) {
|
||||
s.sendLatestInstances(ctx, name, announcement)
|
||||
},
|
||||
},
|
||||
})
|
||||
return NewIterator(announcement, stopCh), nil
|
||||
}
|
||||
|
||||
// Start is used to start the Registry
|
||||
// It is non-blocking
|
||||
func (s *Registry) Start() {
|
||||
s.informerFactory.Start(s.stopCh)
|
||||
if !cache.WaitForCacheSync(s.stopCh, s.podInformer.HasSynced) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Close is used to close the Registry
|
||||
// After closing, any callbacks generated by Watch will not be executed
|
||||
func (s *Registry) Close() {
|
||||
select {
|
||||
case <-s.stopCh:
|
||||
default:
|
||||
close(s.stopCh)
|
||||
}
|
||||
}
|
||||
|
||||
// //////////// K8S Runtime ////////////
|
||||
|
||||
// ServiceAccountNamespacePath defines the location of the namespace file
|
||||
const ServiceAccountNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
|
||||
|
||||
var currentNamespace = LoadNamespace()
|
||||
|
||||
// LoadNamespace is used to get the current namespace from the file
|
||||
func LoadNamespace() string {
|
||||
data, err := os.ReadFile(ServiceAccountNamespacePath)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return string(data)
|
||||
}
|
||||
|
||||
// GetNamespace is used to get the namespace of the Pod where the current container is located
|
||||
func GetNamespace() string {
|
||||
return currentNamespace
|
||||
}
|
||||
|
||||
// GetPodName is used to get the name of the Pod where the current container is located
|
||||
func GetPodName() string {
|
||||
return os.Getenv("HOSTNAME")
|
||||
}
|
||||
|
||||
// //////////// ProtocolMap ////////////
|
||||
|
||||
type protocolMap map[string]string
|
||||
|
||||
func (m protocolMap) GetProtocol(port int32) string {
|
||||
return m[strconv.Itoa(int(port))]
|
||||
}
|
||||
|
||||
// //////////// Iterator ////////////
|
||||
|
||||
// Iterator performs the conversion from channel to iterator
|
||||
// It reads the latest changes from the `chan []*registry.ServiceInstance`
|
||||
// And the outside can sense the closure of Iterator through stopCh
|
||||
type Iterator struct {
|
||||
ch chan []*registry.ServiceInstance
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// NewIterator is used to initialize Iterator
|
||||
func NewIterator(channel chan []*registry.ServiceInstance, stopCh chan struct{}) *Iterator {
|
||||
return &Iterator{
|
||||
ch: channel,
|
||||
stopCh: stopCh,
|
||||
}
|
||||
}
|
||||
|
||||
// Next will block until ServiceInstance changes
|
||||
func (iter *Iterator) Next() ([]*registry.ServiceInstance, error) {
|
||||
select {
|
||||
case instances := <-iter.ch:
|
||||
return instances, nil
|
||||
case <-iter.stopCh:
|
||||
return nil, ErrIteratorClosed
|
||||
}
|
||||
}
|
||||
|
||||
// Stop is used to close the iterator
|
||||
func (iter *Iterator) Stop() error {
|
||||
select {
|
||||
case <-iter.stopCh:
|
||||
default:
|
||||
close(iter.stopCh)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// //////////// Helper Func ////////////
|
||||
|
||||
func marshal(in any) (string, error) {
|
||||
return jsoniter.MarshalToString(in)
|
||||
}
|
||||
|
||||
func unmarshal(data string, in any) error {
|
||||
return jsoniter.UnmarshalFromString(data, in)
|
||||
}
|
||||
|
||||
func isEmptyObjectString(s string) bool {
|
||||
switch s {
|
||||
case "", "{}", "null", "nil", "[]":
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func getProtocolMapByEndpoints(endpoints []string) (protocolMap, error) {
|
||||
ret := protocolMap{}
|
||||
for _, endpoint := range endpoints {
|
||||
u, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret[u.Port()] = u.Scheme
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func getProtocolMapFromPod(pod *corev1.Pod) (protocolMap, error) {
|
||||
protoMap := protocolMap{}
|
||||
if s := pod.Annotations[AnnotationsKeyProtocolMap]; !isEmptyObjectString(s) {
|
||||
err := unmarshal(s, &protoMap)
|
||||
if err != nil {
|
||||
return nil, &ErrorHandleResource{Namespace: pod.Namespace, Name: pod.Name, Reason: err}
|
||||
}
|
||||
}
|
||||
return protoMap, nil
|
||||
}
|
||||
|
||||
func getMetadataFromPod(pod *corev1.Pod) (map[string]string, error) {
|
||||
metadata := map[string]string{}
|
||||
if s := pod.Annotations[AnnotationsKeyMetadata]; !isEmptyObjectString(s) {
|
||||
err := unmarshal(s, &metadata)
|
||||
if err != nil {
|
||||
return nil, &ErrorHandleResource{Namespace: pod.Namespace, Name: pod.Name, Reason: err}
|
||||
}
|
||||
}
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
func getServiceInstanceFromPod(pod *corev1.Pod) (*registry.ServiceInstance, error) {
|
||||
podIP := pod.Status.PodIP
|
||||
podLabels := pod.GetLabels()
|
||||
// Get Metadata
|
||||
metadata, err := getMetadataFromPod(pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Get Protocols Definition
|
||||
protocolMap, err := getProtocolMapFromPod(pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get Endpoints
|
||||
var endpoints []string
|
||||
for _, container := range pod.Spec.Containers {
|
||||
for _, cp := range container.Ports {
|
||||
port := cp.ContainerPort
|
||||
protocol := protocolMap.GetProtocol(port)
|
||||
if protocol == "" {
|
||||
if cp.Name != "" {
|
||||
protocol = strings.Split(cp.Name, "-")[0]
|
||||
} else {
|
||||
protocol = string(cp.Protocol)
|
||||
}
|
||||
}
|
||||
addr := fmt.Sprintf("%s://%s:%d", protocol, podIP, port)
|
||||
endpoints = append(endpoints, addr)
|
||||
}
|
||||
}
|
||||
return ®istry.ServiceInstance{
|
||||
ID: podLabels[LabelsKeyServiceID],
|
||||
Name: podLabels[LabelsKeyServiceName],
|
||||
Version: podLabels[LabelsKeyServiceVersion],
|
||||
Metadata: metadata,
|
||||
Endpoints: endpoints,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// //////////// Error Definition ////////////
|
||||
|
||||
// ErrIteratorClosed defines the error that the iterator is closed
|
||||
var ErrIteratorClosed = errors.New("iterator closed")
|
||||
|
||||
// ErrorHandleResource defines the error that cannot handle K8S resources normally
|
||||
type ErrorHandleResource struct {
|
||||
Namespace string
|
||||
Name string
|
||||
Reason error
|
||||
}
|
||||
|
||||
// Error implements the error interface
|
||||
func (err *ErrorHandleResource) Error() string {
|
||||
return fmt.Sprintf("failed to handle resource(namespace=%s, name=%s): %s",
|
||||
err.Namespace, err.Name, err.Reason)
|
||||
}
|
||||
Reference in New Issue
Block a user