From 989f5da01f8de8c1f803599362024fb3b4c6e9e8 Mon Sep 17 00:00:00 2001 From: Bobo Date: Thu, 26 Jun 2025 16:58:49 +0800 Subject: [PATCH] feat: influxdb. --- database/influxdb/utils.go | 171 ++++++++++++++++++++++++++++++++ database/influxdb/utils_test.go | 76 ++++++++++++++ 2 files changed, 247 insertions(+) create mode 100644 database/influxdb/utils.go create mode 100644 database/influxdb/utils_test.go diff --git a/database/influxdb/utils.go b/database/influxdb/utils.go new file mode 100644 index 0000000..137ffc1 --- /dev/null +++ b/database/influxdb/utils.go @@ -0,0 +1,171 @@ +package influxdb + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func GetPointTag(point *influxdb3.Point, name string) *string { + if point == nil { + return nil + } + tagValue, ok := point.GetTag(name) + if !ok || tagValue == "" { + return nil + } + return &tagValue +} + +func GetBoolPointTag(point *influxdb3.Point, name string) *bool { + if point == nil { + return nil + } + tagValue, ok := point.GetTag(name) + if !ok || tagValue == "" { + return nil + } + + value := tagValue == "true" + return &value +} + +func GetUint32PointTag(point *influxdb3.Point, name string) *uint32 { + if point == nil { + return nil + } + tagValue, ok := point.GetTag(name) + if !ok || tagValue == "" { + return nil + } + + value, err := strconv.ParseUint(tagValue, 10, 64) + if err != nil { + return nil + } + value32 := uint32(value) + return &value32 +} + +func GetUint64PointTag(point *influxdb3.Point, name string) *uint64 { + if point == nil { + return nil + } + tagValue, ok := point.GetTag(name) + if !ok || tagValue == "" { + return nil + } + + value, err := strconv.ParseUint(tagValue, 10, 64) + if err != nil { + return nil + } + return &value +} + +func GetEnumPointTag[T ~int32](point *influxdb3.Point, name string, valueMap map[string]int32) *T { + if point == nil { + return nil + } + tagValue, ok := point.GetTag(name) + if !ok || tagValue == "" { + return nil + } + enumValue, exists := valueMap[tagValue] + if !exists { + return nil + } + + enumType := T(enumValue) + return &enumType +} + +func GetTimestampField(point *influxdb3.Point, name string) *timestamppb.Timestamp { + if point == nil { + return nil + } + + value := point.GetField(name) + if value == nil { + return nil + } + if timestamp, ok := value.(*timestamppb.Timestamp); ok { + return timestamp + } + if timeValue, ok := value.(time.Time); ok { + return timestamppb.New(timeValue) + } + return nil +} + +func GetUint32Field(point *influxdb3.Point, name string) *uint32 { + if point == nil { + return nil + } + + value := point.GetUIntegerField(name) + if value == nil { + return nil + } + uint32Value := uint32(*value) + if uint32Value == 0 { + return nil + } + return &uint32Value +} + +func BoolToString(value *bool) string { + if value == nil { + return "false" + } + if *value { + return "true" + } + return "false" +} + +func Uint64ToString(value *uint64) string { + if value == nil { + return "0" + } + return fmt.Sprintf("%d", *value) +} + +func BuildQueryWithParams( + table string, + filters map[string]interface{}, + operators map[string]string, + fields []string, +) string { + var queryBuilder strings.Builder + + // 构建 SELECT 语句 + queryBuilder.WriteString("SELECT ") + if len(fields) > 0 { + queryBuilder.WriteString(strings.Join(fields, ", ")) + } else { + queryBuilder.WriteString("*") + } + queryBuilder.WriteString(fmt.Sprintf(" FROM %s", table)) + + // 构建 WHERE 条件 + if len(filters) > 0 { + var operator string + queryBuilder.WriteString(" WHERE ") + var conditions []string + for key, value := range filters { + operator = "=" // 默认操作符 + if op, exists := operators[key]; exists { + operator = op + } + conditions = append(conditions, fmt.Sprintf("%s %s %v", key, operator, value)) + } + queryBuilder.WriteString(strings.Join(conditions, " AND ")) + } + + return queryBuilder.String() +} diff --git a/database/influxdb/utils_test.go b/database/influxdb/utils_test.go new file mode 100644 index 0000000..8245a39 --- /dev/null +++ b/database/influxdb/utils_test.go @@ -0,0 +1,76 @@ +package influxdb + +import ( + "testing" +) + +func TestBuildQueryWithParams(t *testing.T) { + tests := []struct { + name string + table string + filters map[string]interface{} + operators map[string]string + fields []string + expectedQuery string + }{ + { + name: "Basic query with filters and fields", + table: "candles", + filters: map[string]interface{}{"s": "'AAPL'", "o": 150.0}, + operators: map[string]string{"o": ">"}, + fields: []string{"s", "o", "h", "l", "c", "v"}, + expectedQuery: "SELECT s, o, h, l, c, v FROM candles WHERE s = 'AAPL' AND o > 150", + }, + { + name: "Query with no filters", + table: "candles", + filters: map[string]interface{}{}, + operators: map[string]string{}, + fields: []string{"s", "o", "h"}, + expectedQuery: "SELECT s, o, h FROM candles", + }, + { + name: "Query with no fields", + table: "candles", + filters: map[string]interface{}{"s": "'AAPL'"}, + operators: map[string]string{}, + fields: []string{}, + expectedQuery: "SELECT * FROM candles WHERE s = 'AAPL'", + }, + { + name: "Empty table name", + table: "", + filters: map[string]interface{}{"s": "'AAPL'"}, + operators: map[string]string{}, + fields: []string{"s", "o"}, + expectedQuery: "SELECT s, o FROM WHERE s = 'AAPL'", + }, + { + name: "Special characters in filters", + table: "candles", + filters: map[string]interface{}{"name": "'O'Reilly'"}, + operators: map[string]string{}, + fields: []string{"name"}, + expectedQuery: "SELECT name FROM candles WHERE name = 'O'Reilly'", + }, + { + name: "Query with interval filters", + table: "candles", + filters: map[string]interface{}{"time": "now() - interval '15 minutes'"}, + operators: map[string]string{"time": ">="}, + fields: []string{"*"}, + expectedQuery: "SELECT * FROM candles WHERE time >= now() - interval '15 minutes'", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query := BuildQueryWithParams(tt.table, tt.filters, tt.operators, tt.fields) + + if query != tt.expectedQuery { + t.Errorf("expected query %s, got %s", tt.expectedQuery, query) + } + //t.Log(query) + }) + } +}