feat: influxdb.

This commit is contained in:
Bobo
2025-06-26 16:58:49 +08:00
parent d31ab9cdf3
commit 989f5da01f
2 changed files with 247 additions and 0 deletions

171
database/influxdb/utils.go Normal file
View File

@@ -0,0 +1,171 @@
package influxdb
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
"google.golang.org/protobuf/types/known/timestamppb"
)
func GetPointTag(point *influxdb3.Point, name string) *string {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
return &tagValue
}
func GetBoolPointTag(point *influxdb3.Point, name string) *bool {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
value := tagValue == "true"
return &value
}
func GetUint32PointTag(point *influxdb3.Point, name string) *uint32 {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
value, err := strconv.ParseUint(tagValue, 10, 64)
if err != nil {
return nil
}
value32 := uint32(value)
return &value32
}
func GetUint64PointTag(point *influxdb3.Point, name string) *uint64 {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
value, err := strconv.ParseUint(tagValue, 10, 64)
if err != nil {
return nil
}
return &value
}
func GetEnumPointTag[T ~int32](point *influxdb3.Point, name string, valueMap map[string]int32) *T {
if point == nil {
return nil
}
tagValue, ok := point.GetTag(name)
if !ok || tagValue == "" {
return nil
}
enumValue, exists := valueMap[tagValue]
if !exists {
return nil
}
enumType := T(enumValue)
return &enumType
}
func GetTimestampField(point *influxdb3.Point, name string) *timestamppb.Timestamp {
if point == nil {
return nil
}
value := point.GetField(name)
if value == nil {
return nil
}
if timestamp, ok := value.(*timestamppb.Timestamp); ok {
return timestamp
}
if timeValue, ok := value.(time.Time); ok {
return timestamppb.New(timeValue)
}
return nil
}
func GetUint32Field(point *influxdb3.Point, name string) *uint32 {
if point == nil {
return nil
}
value := point.GetUIntegerField(name)
if value == nil {
return nil
}
uint32Value := uint32(*value)
if uint32Value == 0 {
return nil
}
return &uint32Value
}
func BoolToString(value *bool) string {
if value == nil {
return "false"
}
if *value {
return "true"
}
return "false"
}
func Uint64ToString(value *uint64) string {
if value == nil {
return "0"
}
return fmt.Sprintf("%d", *value)
}
func BuildQueryWithParams(
table string,
filters map[string]interface{},
operators map[string]string,
fields []string,
) string {
var queryBuilder strings.Builder
// 构建 SELECT 语句
queryBuilder.WriteString("SELECT ")
if len(fields) > 0 {
queryBuilder.WriteString(strings.Join(fields, ", "))
} else {
queryBuilder.WriteString("*")
}
queryBuilder.WriteString(fmt.Sprintf(" FROM %s", table))
// 构建 WHERE 条件
if len(filters) > 0 {
var operator string
queryBuilder.WriteString(" WHERE ")
var conditions []string
for key, value := range filters {
operator = "=" // 默认操作符
if op, exists := operators[key]; exists {
operator = op
}
conditions = append(conditions, fmt.Sprintf("%s %s %v", key, operator, value))
}
queryBuilder.WriteString(strings.Join(conditions, " AND "))
}
return queryBuilder.String()
}

View File

@@ -0,0 +1,76 @@
package influxdb
import (
"testing"
)
func TestBuildQueryWithParams(t *testing.T) {
tests := []struct {
name string
table string
filters map[string]interface{}
operators map[string]string
fields []string
expectedQuery string
}{
{
name: "Basic query with filters and fields",
table: "candles",
filters: map[string]interface{}{"s": "'AAPL'", "o": 150.0},
operators: map[string]string{"o": ">"},
fields: []string{"s", "o", "h", "l", "c", "v"},
expectedQuery: "SELECT s, o, h, l, c, v FROM candles WHERE s = 'AAPL' AND o > 150",
},
{
name: "Query with no filters",
table: "candles",
filters: map[string]interface{}{},
operators: map[string]string{},
fields: []string{"s", "o", "h"},
expectedQuery: "SELECT s, o, h FROM candles",
},
{
name: "Query with no fields",
table: "candles",
filters: map[string]interface{}{"s": "'AAPL'"},
operators: map[string]string{},
fields: []string{},
expectedQuery: "SELECT * FROM candles WHERE s = 'AAPL'",
},
{
name: "Empty table name",
table: "",
filters: map[string]interface{}{"s": "'AAPL'"},
operators: map[string]string{},
fields: []string{"s", "o"},
expectedQuery: "SELECT s, o FROM WHERE s = 'AAPL'",
},
{
name: "Special characters in filters",
table: "candles",
filters: map[string]interface{}{"name": "'O'Reilly'"},
operators: map[string]string{},
fields: []string{"name"},
expectedQuery: "SELECT name FROM candles WHERE name = 'O'Reilly'",
},
{
name: "Query with interval filters",
table: "candles",
filters: map[string]interface{}{"time": "now() - interval '15 minutes'"},
operators: map[string]string{"time": ">="},
fields: []string{"*"},
expectedQuery: "SELECT * FROM candles WHERE time >= now() - interval '15 minutes'",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query := BuildQueryWithParams(tt.table, tt.filters, tt.operators, tt.fields)
if query != tt.expectedQuery {
t.Errorf("expected query %s, got %s", tt.expectedQuery, query)
}
//t.Log(query)
})
}
}