Compare commits
2 Commits
database/i
...
api/v0.0.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fcd2a5ee43 | ||
|
|
989f5da01f |
@@ -433,21 +433,22 @@ func (x *Data_Redis) GetEnableMetrics() bool {
|
||||
type Data_MongoDB struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Uri string `protobuf:"bytes,1,opt,name=uri,proto3" json:"uri,omitempty"`
|
||||
Username *string `protobuf:"bytes,2,opt,name=username,proto3,oneof" json:"username,omitempty"`
|
||||
Password *string `protobuf:"bytes,3,opt,name=password,proto3,oneof" json:"password,omitempty"`
|
||||
AuthMechanism *string `protobuf:"bytes,4,opt,name=auth_mechanism,json=authMechanism,proto3,oneof" json:"auth_mechanism,omitempty"` // 认证机制:SCRAM-SHA-1、SCRAM-SHA-256、MONGODB-X509、GSSAPI、PLAIN
|
||||
AuthMechanismProperties map[string]string `protobuf:"bytes,5,rep,name=auth_mechanism_properties,json=authMechanismProperties,proto3" json:"auth_mechanism_properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // 认证机制属性
|
||||
AuthSource *string `protobuf:"bytes,6,opt,name=auth_source,json=authSource,proto3,oneof" json:"auth_source,omitempty"` // 认证源:admin、$external等
|
||||
ConnectTimeout *durationpb.Duration `protobuf:"bytes,50,opt,name=connect_timeout,json=connectTimeout,proto3" json:"connect_timeout,omitempty"` // 连接超时时间
|
||||
HeartbeatInterval *durationpb.Duration `protobuf:"bytes,51,opt,name=heartbeat_interval,json=heartbeatInterval,proto3" json:"heartbeat_interval,omitempty"` // 心跳间隔
|
||||
LocalThreshold *durationpb.Duration `protobuf:"bytes,52,opt,name=local_threshold,json=localThreshold,proto3" json:"local_threshold,omitempty"` // 本地延迟阈值
|
||||
MaxConnIdleTime *durationpb.Duration `protobuf:"bytes,53,opt,name=max_conn_idle_time,json=maxConnIdleTime,proto3" json:"max_conn_idle_time,omitempty"` // 最大连接空闲时间
|
||||
MaxStaleness *durationpb.Duration `protobuf:"bytes,54,opt,name=max_staleness,json=maxStaleness,proto3" json:"max_staleness,omitempty"` // 最大陈旧时间
|
||||
ServerSelectionTimeout *durationpb.Duration `protobuf:"bytes,55,opt,name=server_selection_timeout,json=serverSelectionTimeout,proto3" json:"server_selection_timeout,omitempty"` // 服务器选择超时时间
|
||||
SocketTimeout *durationpb.Duration `protobuf:"bytes,56,opt,name=socket_timeout,json=socketTimeout,proto3" json:"socket_timeout,omitempty"` // 套接字超时时间
|
||||
Timeout *durationpb.Duration `protobuf:"bytes,57,opt,name=timeout,proto3" json:"timeout,omitempty"` // 总超时时间
|
||||
EnableTracing bool `protobuf:"varint,100,opt,name=enable_tracing,json=enableTracing,proto3" json:"enable_tracing,omitempty"` // 打开链路追踪
|
||||
EnableMetrics bool `protobuf:"varint,101,opt,name=enable_metrics,json=enableMetrics,proto3" json:"enable_metrics,omitempty"` // 打开性能度量
|
||||
Database *string `protobuf:"bytes,2,opt,name=database,proto3,oneof" json:"database,omitempty"`
|
||||
Username *string `protobuf:"bytes,10,opt,name=username,proto3,oneof" json:"username,omitempty"`
|
||||
Password *string `protobuf:"bytes,11,opt,name=password,proto3,oneof" json:"password,omitempty"`
|
||||
AuthMechanism *string `protobuf:"bytes,20,opt,name=auth_mechanism,json=authMechanism,proto3,oneof" json:"auth_mechanism,omitempty"` // 认证机制:SCRAM-SHA-1、SCRAM-SHA-256、MONGODB-X509、GSSAPI、PLAIN
|
||||
AuthMechanismProperties map[string]string `protobuf:"bytes,21,rep,name=auth_mechanism_properties,json=authMechanismProperties,proto3" json:"auth_mechanism_properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // 认证机制属性
|
||||
AuthSource *string `protobuf:"bytes,22,opt,name=auth_source,json=authSource,proto3,oneof" json:"auth_source,omitempty"` // 认证源:admin、$external等
|
||||
ConnectTimeout *durationpb.Duration `protobuf:"bytes,50,opt,name=connect_timeout,json=connectTimeout,proto3" json:"connect_timeout,omitempty"` // 连接超时时间
|
||||
HeartbeatInterval *durationpb.Duration `protobuf:"bytes,51,opt,name=heartbeat_interval,json=heartbeatInterval,proto3" json:"heartbeat_interval,omitempty"` // 心跳间隔
|
||||
LocalThreshold *durationpb.Duration `protobuf:"bytes,52,opt,name=local_threshold,json=localThreshold,proto3" json:"local_threshold,omitempty"` // 本地延迟阈值
|
||||
MaxConnIdleTime *durationpb.Duration `protobuf:"bytes,53,opt,name=max_conn_idle_time,json=maxConnIdleTime,proto3" json:"max_conn_idle_time,omitempty"` // 最大连接空闲时间
|
||||
MaxStaleness *durationpb.Duration `protobuf:"bytes,54,opt,name=max_staleness,json=maxStaleness,proto3" json:"max_staleness,omitempty"` // 最大陈旧时间
|
||||
ServerSelectionTimeout *durationpb.Duration `protobuf:"bytes,55,opt,name=server_selection_timeout,json=serverSelectionTimeout,proto3" json:"server_selection_timeout,omitempty"` // 服务器选择超时时间
|
||||
SocketTimeout *durationpb.Duration `protobuf:"bytes,56,opt,name=socket_timeout,json=socketTimeout,proto3" json:"socket_timeout,omitempty"` // 套接字超时时间
|
||||
Timeout *durationpb.Duration `protobuf:"bytes,57,opt,name=timeout,proto3" json:"timeout,omitempty"` // 总超时时间
|
||||
EnableTracing bool `protobuf:"varint,100,opt,name=enable_tracing,json=enableTracing,proto3" json:"enable_tracing,omitempty"` // 打开链路追踪
|
||||
EnableMetrics bool `protobuf:"varint,101,opt,name=enable_metrics,json=enableMetrics,proto3" json:"enable_metrics,omitempty"` // 打开性能度量
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -489,6 +490,13 @@ func (x *Data_MongoDB) GetUri() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Data_MongoDB) GetDatabase() string {
|
||||
if x != nil && x.Database != nil {
|
||||
return *x.Database
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Data_MongoDB) GetUsername() string {
|
||||
if x != nil && x.Username != nil {
|
||||
return *x.Username
|
||||
@@ -1835,7 +1843,7 @@ var File_conf_v1_kratos_conf_data_proto protoreflect.FileDescriptor
|
||||
|
||||
const file_conf_v1_kratos_conf_data_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"\x1econf/v1/kratos_conf_data.proto\x12\x04conf\x1a\x1egoogle/protobuf/duration.proto\x1a\x1dconf/v1/kratos_conf_tls.proto\"\x995\n" +
|
||||
"\x1econf/v1/kratos_conf_data.proto\x12\x04conf\x1a\x1egoogle/protobuf/duration.proto\x1a\x1dconf/v1/kratos_conf_tls.proto\"\xc75\n" +
|
||||
"\x04Data\x124\n" +
|
||||
"\bdatabase\x18\x01 \x01(\v2\x13.conf.Data.DatabaseH\x00R\bdatabase\x88\x01\x01\x12+\n" +
|
||||
"\x05redis\x18\n" +
|
||||
@@ -1886,14 +1894,16 @@ const file_conf_v1_kratos_conf_data_proto_rawDesc = "" +
|
||||
"\fread_timeout\x183 \x01(\v2\x19.google.protobuf.DurationR\vreadTimeout\x12>\n" +
|
||||
"\rwrite_timeout\x184 \x01(\v2\x19.google.protobuf.DurationR\fwriteTimeout\x12%\n" +
|
||||
"\x0eenable_tracing\x18d \x01(\bR\renableTracing\x12&\n" +
|
||||
"\x0eenable_metrics\x18\xe9\a \x01(\bR\renableMetrics\x1a\x99\b\n" +
|
||||
"\x0eenable_metrics\x18\xe9\a \x01(\bR\renableMetrics\x1a\xc7\b\n" +
|
||||
"\aMongoDB\x12\x10\n" +
|
||||
"\x03uri\x18\x01 \x01(\tR\x03uri\x12\x1f\n" +
|
||||
"\busername\x18\x02 \x01(\tH\x00R\busername\x88\x01\x01\x12\x1f\n" +
|
||||
"\bpassword\x18\x03 \x01(\tH\x01R\bpassword\x88\x01\x01\x12*\n" +
|
||||
"\x0eauth_mechanism\x18\x04 \x01(\tH\x02R\rauthMechanism\x88\x01\x01\x12k\n" +
|
||||
"\x19auth_mechanism_properties\x18\x05 \x03(\v2/.conf.Data.MongoDB.AuthMechanismPropertiesEntryR\x17authMechanismProperties\x12$\n" +
|
||||
"\vauth_source\x18\x06 \x01(\tH\x03R\n" +
|
||||
"\bdatabase\x18\x02 \x01(\tH\x00R\bdatabase\x88\x01\x01\x12\x1f\n" +
|
||||
"\busername\x18\n" +
|
||||
" \x01(\tH\x01R\busername\x88\x01\x01\x12\x1f\n" +
|
||||
"\bpassword\x18\v \x01(\tH\x02R\bpassword\x88\x01\x01\x12*\n" +
|
||||
"\x0eauth_mechanism\x18\x14 \x01(\tH\x03R\rauthMechanism\x88\x01\x01\x12k\n" +
|
||||
"\x19auth_mechanism_properties\x18\x15 \x03(\v2/.conf.Data.MongoDB.AuthMechanismPropertiesEntryR\x17authMechanismProperties\x12$\n" +
|
||||
"\vauth_source\x18\x16 \x01(\tH\x04R\n" +
|
||||
"authSource\x88\x01\x01\x12B\n" +
|
||||
"\x0fconnect_timeout\x182 \x01(\v2\x19.google.protobuf.DurationR\x0econnectTimeout\x12H\n" +
|
||||
"\x12heartbeat_interval\x183 \x01(\v2\x19.google.protobuf.DurationR\x11heartbeatInterval\x12B\n" +
|
||||
@@ -1908,6 +1918,7 @@ const file_conf_v1_kratos_conf_data_proto_rawDesc = "" +
|
||||
"\x1cAuthMechanismPropertiesEntry\x12\x10\n" +
|
||||
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
|
||||
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01B\v\n" +
|
||||
"\t_databaseB\v\n" +
|
||||
"\t_usernameB\v\n" +
|
||||
"\t_passwordB\x11\n" +
|
||||
"\x0f_auth_mechanismB\x0e\n" +
|
||||
|
||||
@@ -48,12 +48,14 @@ message Data {
|
||||
message MongoDB {
|
||||
string uri = 1;
|
||||
|
||||
optional string username = 2;
|
||||
optional string password = 3;
|
||||
optional string database = 2;
|
||||
|
||||
optional string auth_mechanism = 4; // 认证机制:SCRAM-SHA-1、SCRAM-SHA-256、MONGODB-X509、GSSAPI、PLAIN
|
||||
map<string, string> auth_mechanism_properties = 5; // 认证机制属性
|
||||
optional string auth_source = 6; // 认证源:admin、$external等
|
||||
optional string username = 10;
|
||||
optional string password = 11;
|
||||
|
||||
optional string auth_mechanism = 20; // 认证机制:SCRAM-SHA-1、SCRAM-SHA-256、MONGODB-X509、GSSAPI、PLAIN
|
||||
map<string, string> auth_mechanism_properties = 21; // 认证机制属性
|
||||
optional string auth_source = 22; // 认证源:admin、$external等
|
||||
|
||||
google.protobuf.Duration connect_timeout = 50; // 连接超时时间
|
||||
google.protobuf.Duration heartbeat_interval = 51; // 心跳间隔
|
||||
|
||||
171
database/influxdb/utils.go
Normal file
171
database/influxdb/utils.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func GetPointTag(point *influxdb3.Point, name string) *string {
|
||||
if point == nil {
|
||||
return nil
|
||||
}
|
||||
tagValue, ok := point.GetTag(name)
|
||||
if !ok || tagValue == "" {
|
||||
return nil
|
||||
}
|
||||
return &tagValue
|
||||
}
|
||||
|
||||
func GetBoolPointTag(point *influxdb3.Point, name string) *bool {
|
||||
if point == nil {
|
||||
return nil
|
||||
}
|
||||
tagValue, ok := point.GetTag(name)
|
||||
if !ok || tagValue == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
value := tagValue == "true"
|
||||
return &value
|
||||
}
|
||||
|
||||
func GetUint32PointTag(point *influxdb3.Point, name string) *uint32 {
|
||||
if point == nil {
|
||||
return nil
|
||||
}
|
||||
tagValue, ok := point.GetTag(name)
|
||||
if !ok || tagValue == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
value, err := strconv.ParseUint(tagValue, 10, 64)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
value32 := uint32(value)
|
||||
return &value32
|
||||
}
|
||||
|
||||
func GetUint64PointTag(point *influxdb3.Point, name string) *uint64 {
|
||||
if point == nil {
|
||||
return nil
|
||||
}
|
||||
tagValue, ok := point.GetTag(name)
|
||||
if !ok || tagValue == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
value, err := strconv.ParseUint(tagValue, 10, 64)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return &value
|
||||
}
|
||||
|
||||
func GetEnumPointTag[T ~int32](point *influxdb3.Point, name string, valueMap map[string]int32) *T {
|
||||
if point == nil {
|
||||
return nil
|
||||
}
|
||||
tagValue, ok := point.GetTag(name)
|
||||
if !ok || tagValue == "" {
|
||||
return nil
|
||||
}
|
||||
enumValue, exists := valueMap[tagValue]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
enumType := T(enumValue)
|
||||
return &enumType
|
||||
}
|
||||
|
||||
func GetTimestampField(point *influxdb3.Point, name string) *timestamppb.Timestamp {
|
||||
if point == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
value := point.GetField(name)
|
||||
if value == nil {
|
||||
return nil
|
||||
}
|
||||
if timestamp, ok := value.(*timestamppb.Timestamp); ok {
|
||||
return timestamp
|
||||
}
|
||||
if timeValue, ok := value.(time.Time); ok {
|
||||
return timestamppb.New(timeValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetUint32Field(point *influxdb3.Point, name string) *uint32 {
|
||||
if point == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
value := point.GetUIntegerField(name)
|
||||
if value == nil {
|
||||
return nil
|
||||
}
|
||||
uint32Value := uint32(*value)
|
||||
if uint32Value == 0 {
|
||||
return nil
|
||||
}
|
||||
return &uint32Value
|
||||
}
|
||||
|
||||
func BoolToString(value *bool) string {
|
||||
if value == nil {
|
||||
return "false"
|
||||
}
|
||||
if *value {
|
||||
return "true"
|
||||
}
|
||||
return "false"
|
||||
}
|
||||
|
||||
func Uint64ToString(value *uint64) string {
|
||||
if value == nil {
|
||||
return "0"
|
||||
}
|
||||
return fmt.Sprintf("%d", *value)
|
||||
}
|
||||
|
||||
func BuildQueryWithParams(
|
||||
table string,
|
||||
filters map[string]interface{},
|
||||
operators map[string]string,
|
||||
fields []string,
|
||||
) string {
|
||||
var queryBuilder strings.Builder
|
||||
|
||||
// 构建 SELECT 语句
|
||||
queryBuilder.WriteString("SELECT ")
|
||||
if len(fields) > 0 {
|
||||
queryBuilder.WriteString(strings.Join(fields, ", "))
|
||||
} else {
|
||||
queryBuilder.WriteString("*")
|
||||
}
|
||||
queryBuilder.WriteString(fmt.Sprintf(" FROM %s", table))
|
||||
|
||||
// 构建 WHERE 条件
|
||||
if len(filters) > 0 {
|
||||
var operator string
|
||||
queryBuilder.WriteString(" WHERE ")
|
||||
var conditions []string
|
||||
for key, value := range filters {
|
||||
operator = "=" // 默认操作符
|
||||
if op, exists := operators[key]; exists {
|
||||
operator = op
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("%s %s %v", key, operator, value))
|
||||
}
|
||||
queryBuilder.WriteString(strings.Join(conditions, " AND "))
|
||||
}
|
||||
|
||||
return queryBuilder.String()
|
||||
}
|
||||
76
database/influxdb/utils_test.go
Normal file
76
database/influxdb/utils_test.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBuildQueryWithParams(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
table string
|
||||
filters map[string]interface{}
|
||||
operators map[string]string
|
||||
fields []string
|
||||
expectedQuery string
|
||||
}{
|
||||
{
|
||||
name: "Basic query with filters and fields",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{"s": "'AAPL'", "o": 150.0},
|
||||
operators: map[string]string{"o": ">"},
|
||||
fields: []string{"s", "o", "h", "l", "c", "v"},
|
||||
expectedQuery: "SELECT s, o, h, l, c, v FROM candles WHERE s = 'AAPL' AND o > 150",
|
||||
},
|
||||
{
|
||||
name: "Query with no filters",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{},
|
||||
operators: map[string]string{},
|
||||
fields: []string{"s", "o", "h"},
|
||||
expectedQuery: "SELECT s, o, h FROM candles",
|
||||
},
|
||||
{
|
||||
name: "Query with no fields",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{"s": "'AAPL'"},
|
||||
operators: map[string]string{},
|
||||
fields: []string{},
|
||||
expectedQuery: "SELECT * FROM candles WHERE s = 'AAPL'",
|
||||
},
|
||||
{
|
||||
name: "Empty table name",
|
||||
table: "",
|
||||
filters: map[string]interface{}{"s": "'AAPL'"},
|
||||
operators: map[string]string{},
|
||||
fields: []string{"s", "o"},
|
||||
expectedQuery: "SELECT s, o FROM WHERE s = 'AAPL'",
|
||||
},
|
||||
{
|
||||
name: "Special characters in filters",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{"name": "'O'Reilly'"},
|
||||
operators: map[string]string{},
|
||||
fields: []string{"name"},
|
||||
expectedQuery: "SELECT name FROM candles WHERE name = 'O'Reilly'",
|
||||
},
|
||||
{
|
||||
name: "Query with interval filters",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{"time": "now() - interval '15 minutes'"},
|
||||
operators: map[string]string{"time": ">="},
|
||||
fields: []string{"*"},
|
||||
expectedQuery: "SELECT * FROM candles WHERE time >= now() - interval '15 minutes'",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
query := BuildQueryWithParams(tt.table, tt.filters, tt.operators, tt.fields)
|
||||
|
||||
if query != tt.expectedQuery {
|
||||
t.Errorf("expected query %s, got %s", tt.expectedQuery, query)
|
||||
}
|
||||
//t.Log(query)
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user