Compare commits
5 Commits
database/e
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| c2726db8a1 | |||
|
|
47c72651db | ||
|
|
f267c19c73 | ||
|
|
8c017a34e0 | ||
|
|
ac6f0d1987 |
@@ -191,6 +191,7 @@ type Logger_Logrus struct {
|
|||||||
TimestampFormat string `protobuf:"bytes,3,opt,name=timestamp_format,json=timestampFormat,proto3" json:"timestamp_format,omitempty"` // 定义时间戳格式,例如:"2006-01-02 15:04:05"
|
TimestampFormat string `protobuf:"bytes,3,opt,name=timestamp_format,json=timestampFormat,proto3" json:"timestamp_format,omitempty"` // 定义时间戳格式,例如:"2006-01-02 15:04:05"
|
||||||
DisableColors bool `protobuf:"varint,4,opt,name=disable_colors,json=disableColors,proto3" json:"disable_colors,omitempty"` // 不需要彩色日志
|
DisableColors bool `protobuf:"varint,4,opt,name=disable_colors,json=disableColors,proto3" json:"disable_colors,omitempty"` // 不需要彩色日志
|
||||||
DisableTimestamp bool `protobuf:"varint,5,opt,name=disable_timestamp,json=disableTimestamp,proto3" json:"disable_timestamp,omitempty"` // 不需要时间戳
|
DisableTimestamp bool `protobuf:"varint,5,opt,name=disable_timestamp,json=disableTimestamp,proto3" json:"disable_timestamp,omitempty"` // 不需要时间戳
|
||||||
|
ForceColors bool `protobuf:"varint,6,opt,name=force_colors,json=forceColors,proto3" json:"force_colors,omitempty"` // 是否开启彩色日志
|
||||||
unknownFields protoimpl.UnknownFields
|
unknownFields protoimpl.UnknownFields
|
||||||
sizeCache protoimpl.SizeCache
|
sizeCache protoimpl.SizeCache
|
||||||
}
|
}
|
||||||
@@ -260,6 +261,13 @@ func (x *Logger_Logrus) GetDisableTimestamp() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *Logger_Logrus) GetForceColors() bool {
|
||||||
|
if x != nil {
|
||||||
|
return x.ForceColors
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Fluent
|
// Fluent
|
||||||
type Logger_Fluent struct {
|
type Logger_Fluent struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
@@ -447,7 +455,7 @@ var File_conf_v1_kratos_conf_logger_proto protoreflect.FileDescriptor
|
|||||||
|
|
||||||
const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
|
const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
|
||||||
"\n" +
|
"\n" +
|
||||||
" conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xc4\a\n" +
|
" conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xe7\a\n" +
|
||||||
"\x06Logger\x12\x12\n" +
|
"\x06Logger\x12\x12\n" +
|
||||||
"\x04type\x18\x01 \x01(\tR\x04type\x12'\n" +
|
"\x04type\x18\x01 \x01(\tR\x04type\x12'\n" +
|
||||||
"\x03zap\x18\x02 \x01(\v2\x10.conf.Logger.ZapH\x00R\x03zap\x88\x01\x01\x120\n" +
|
"\x03zap\x18\x02 \x01(\v2\x10.conf.Logger.ZapH\x00R\x03zap\x88\x01\x01\x120\n" +
|
||||||
@@ -461,13 +469,14 @@ const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
|
|||||||
"\bmax_size\x18\x03 \x01(\x05R\amaxSize\x12\x17\n" +
|
"\bmax_size\x18\x03 \x01(\x05R\amaxSize\x12\x17\n" +
|
||||||
"\amax_age\x18\x04 \x01(\x05R\x06maxAge\x12\x1f\n" +
|
"\amax_age\x18\x04 \x01(\x05R\x06maxAge\x12\x1f\n" +
|
||||||
"\vmax_backups\x18\x05 \x01(\x05R\n" +
|
"\vmax_backups\x18\x05 \x01(\x05R\n" +
|
||||||
"maxBackups\x1a\xbb\x01\n" +
|
"maxBackups\x1a\xde\x01\n" +
|
||||||
"\x06Logrus\x12\x14\n" +
|
"\x06Logrus\x12\x14\n" +
|
||||||
"\x05level\x18\x01 \x01(\tR\x05level\x12\x1c\n" +
|
"\x05level\x18\x01 \x01(\tR\x05level\x12\x1c\n" +
|
||||||
"\tformatter\x18\x02 \x01(\tR\tformatter\x12)\n" +
|
"\tformatter\x18\x02 \x01(\tR\tformatter\x12)\n" +
|
||||||
"\x10timestamp_format\x18\x03 \x01(\tR\x0ftimestampFormat\x12%\n" +
|
"\x10timestamp_format\x18\x03 \x01(\tR\x0ftimestampFormat\x12%\n" +
|
||||||
"\x0edisable_colors\x18\x04 \x01(\bR\rdisableColors\x12+\n" +
|
"\x0edisable_colors\x18\x04 \x01(\bR\rdisableColors\x12+\n" +
|
||||||
"\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x1a$\n" +
|
"\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x12!\n" +
|
||||||
|
"\fforce_colors\x18\x06 \x01(\bR\vforceColors\x1a$\n" +
|
||||||
"\x06Fluent\x12\x1a\n" +
|
"\x06Fluent\x12\x1a\n" +
|
||||||
"\bendpoint\x18\x01 \x01(\tR\bendpoint\x1a\x82\x01\n" +
|
"\bendpoint\x18\x01 \x01(\tR\bendpoint\x1a\x82\x01\n" +
|
||||||
"\x06Aliyun\x12\x1a\n" +
|
"\x06Aliyun\x12\x1a\n" +
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ message Logger {
|
|||||||
string timestamp_format = 3; // 定义时间戳格式,例如:"2006-01-02 15:04:05"
|
string timestamp_format = 3; // 定义时间戳格式,例如:"2006-01-02 15:04:05"
|
||||||
bool disable_colors = 4; // 不需要彩色日志
|
bool disable_colors = 4; // 不需要彩色日志
|
||||||
bool disable_timestamp = 5; // 不需要时间戳
|
bool disable_timestamp = 5; // 不需要时间戳
|
||||||
|
bool force_colors = 6; // 是否开启彩色日志
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fluent
|
// Fluent
|
||||||
|
|||||||
@@ -167,14 +167,15 @@ func appendStructToBatch(batch driverV2.Batch, obj interface{}, columns []string
|
|||||||
field := t.Field(j)
|
field := t.Field(j)
|
||||||
|
|
||||||
// 检查ch标签
|
// 检查ch标签
|
||||||
if tag := field.Tag.Get("ch"); tag == col {
|
if tag := field.Tag.Get("ch"); strings.TrimSpace(tag) == col {
|
||||||
values[i] = v.Field(j).Interface()
|
values[i] = v.Field(j).Interface()
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查json标签
|
// 检查json标签
|
||||||
if tag := field.Tag.Get("json"); tag == col {
|
jsonTags := strings.Split(field.Tag.Get("json"), ",")
|
||||||
|
if len(jsonTags) > 0 && strings.TrimSpace(jsonTags[0]) == col {
|
||||||
values[i] = v.Field(j).Interface()
|
values[i] = v.Field(j).Interface()
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
|
|||||||
@@ -4,13 +4,15 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
|
||||||
clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2"
|
clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2"
|
||||||
driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||||
|
|
||||||
"github.com/go-kratos/kratos/v2/log"
|
"github.com/go-kratos/kratos/v2/log"
|
||||||
|
|
||||||
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
|
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
|
||||||
"github.com/tx7do/kratos-bootstrap/utils"
|
"github.com/tx7do/kratos-bootstrap/utils"
|
||||||
)
|
)
|
||||||
@@ -229,7 +231,7 @@ func (c *Client) CheckConnection(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Query 执行查询并返回结果
|
// Query 执行查询并返回结果
|
||||||
func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, query string, args ...interface{}) error {
|
func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, query string, args ...any) error {
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
c.log.Error("clickhouse client is not initialized")
|
c.log.Error("clickhouse client is not initialized")
|
||||||
return ErrClientNotInitialized
|
return ErrClientNotInitialized
|
||||||
@@ -269,7 +271,7 @@ func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, que
|
|||||||
}
|
}
|
||||||
|
|
||||||
// QueryRow 执行查询并返回单行结果
|
// QueryRow 执行查询并返回单行结果
|
||||||
func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...interface{}) error {
|
func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...any) error {
|
||||||
row := c.conn.QueryRow(ctx, query, args...)
|
row := c.conn.QueryRow(ctx, query, args...)
|
||||||
if row == nil {
|
if row == nil {
|
||||||
c.log.Error("query row returned nil")
|
c.log.Error("query row returned nil")
|
||||||
@@ -285,7 +287,7 @@ func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...i
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Select 封装 SELECT 子句
|
// Select 封装 SELECT 子句
|
||||||
func (c *Client) Select(ctx context.Context, dest any, query string, args ...interface{}) error {
|
func (c *Client) Select(ctx context.Context, dest any, query string, args ...any) error {
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
c.log.Error("clickhouse client is not initialized")
|
c.log.Error("clickhouse client is not initialized")
|
||||||
return ErrClientNotInitialized
|
return ErrClientNotInitialized
|
||||||
@@ -301,7 +303,7 @@ func (c *Client) Select(ctx context.Context, dest any, query string, args ...int
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Exec 执行非查询语句
|
// Exec 执行非查询语句
|
||||||
func (c *Client) Exec(ctx context.Context, query string, args ...interface{}) error {
|
func (c *Client) Exec(ctx context.Context, query string, args ...any) error {
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
c.log.Error("clickhouse client is not initialized")
|
c.log.Error("clickhouse client is not initialized")
|
||||||
return ErrClientNotInitialized
|
return ErrClientNotInitialized
|
||||||
@@ -315,23 +317,268 @@ func (c *Client) Exec(ctx context.Context, query string, args ...interface{}) er
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AsyncInsert 异步插入数据
|
func (c *Client) prepareInsertData(data any) (string, string, []any, error) {
|
||||||
func (c *Client) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
val := reflect.ValueOf(data)
|
||||||
|
if val.Kind() != reflect.Ptr || val.IsNil() {
|
||||||
|
return "", "", nil, fmt.Errorf("data must be a non-nil pointer")
|
||||||
|
}
|
||||||
|
|
||||||
|
val = val.Elem()
|
||||||
|
typ := val.Type()
|
||||||
|
|
||||||
|
columns := make([]string, 0, typ.NumField())
|
||||||
|
placeholders := make([]string, 0, typ.NumField())
|
||||||
|
values := make([]any, 0, typ.NumField())
|
||||||
|
|
||||||
|
values = structToValueArray(data)
|
||||||
|
|
||||||
|
for i := 0; i < typ.NumField(); i++ {
|
||||||
|
field := typ.Field(i)
|
||||||
|
|
||||||
|
// 优先获取 `ch` 标签,其次获取 `json` 标签,最后使用字段名
|
||||||
|
columnName := field.Tag.Get("ch")
|
||||||
|
if columnName == "" {
|
||||||
|
jsonTag := field.Tag.Get("json")
|
||||||
|
if jsonTag != "" {
|
||||||
|
tags := strings.Split(jsonTag, ",") // 只取逗号前的部分
|
||||||
|
if len(tags) > 0 {
|
||||||
|
columnName = tags[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if columnName == "" {
|
||||||
|
columnName = field.Name
|
||||||
|
}
|
||||||
|
//columnName = strings.TrimSpace(columnName)
|
||||||
|
|
||||||
|
columns = append(columns, columnName)
|
||||||
|
placeholders = append(placeholders, "?")
|
||||||
|
}
|
||||||
|
|
||||||
|
return strings.Join(columns, ", "), strings.Join(placeholders, ", "), values, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert 插入数据到指定表
|
||||||
|
func (c *Client) Insert(ctx context.Context, tableName string, in any) error {
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
c.log.Error("clickhouse client is not initialized")
|
c.log.Error("clickhouse client is not initialized")
|
||||||
return ErrClientNotInitialized
|
return ErrClientNotInitialized
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.conn.AsyncInsert(ctx, query, wait, args...); err != nil {
|
columns, placeholders, values, err := c.prepareInsertData(in)
|
||||||
c.log.Errorf("exec failed: %v", err)
|
if err != nil {
|
||||||
|
c.log.Errorf("prepare insert in failed: %v", err)
|
||||||
|
return ErrPrepareInsertDataFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构造 SQL 语句
|
||||||
|
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
|
||||||
|
tableName,
|
||||||
|
columns,
|
||||||
|
placeholders,
|
||||||
|
)
|
||||||
|
|
||||||
|
// 执行插入操作
|
||||||
|
if err = c.conn.Exec(ctx, query, values...); err != nil {
|
||||||
|
c.log.Errorf("insert failed: %v", err)
|
||||||
|
return ErrInsertFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) InsertMany(ctx context.Context, tableName string, data []any) error {
|
||||||
|
if c.conn == nil {
|
||||||
|
c.log.Error("clickhouse client is not initialized")
|
||||||
|
return ErrClientNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(data) == 0 {
|
||||||
|
c.log.Error("data slice is empty")
|
||||||
|
return ErrInvalidColumnData
|
||||||
|
}
|
||||||
|
|
||||||
|
var columns string
|
||||||
|
var placeholders []string
|
||||||
|
var values []any
|
||||||
|
|
||||||
|
for _, item := range data {
|
||||||
|
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Errorf("prepare insert data failed: %v", err)
|
||||||
|
return ErrPrepareInsertDataFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
if columns == "" {
|
||||||
|
columns = itemColumns
|
||||||
|
} else if columns != itemColumns {
|
||||||
|
c.log.Error("data items have inconsistent columns")
|
||||||
|
return ErrInvalidColumnData
|
||||||
|
}
|
||||||
|
|
||||||
|
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
|
||||||
|
values = append(values, itemValues...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构造 SQL 语句
|
||||||
|
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
|
||||||
|
tableName,
|
||||||
|
columns,
|
||||||
|
strings.Join(placeholders, ", "),
|
||||||
|
)
|
||||||
|
|
||||||
|
// 执行插入操作
|
||||||
|
if err := c.conn.Exec(ctx, query, values...); err != nil {
|
||||||
|
c.log.Errorf("insert many failed: %v", err)
|
||||||
|
return ErrInsertFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AsyncInsert 异步插入数据
|
||||||
|
func (c *Client) AsyncInsert(ctx context.Context, tableName string, data any, wait bool) error {
|
||||||
|
if c.conn == nil {
|
||||||
|
c.log.Error("clickhouse client is not initialized")
|
||||||
|
return ErrClientNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
// 准备插入数据
|
||||||
|
columns, placeholders, values, err := c.prepareInsertData(data)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Errorf("prepare insert data failed: %v", err)
|
||||||
|
return ErrPrepareInsertDataFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构造 SQL 语句
|
||||||
|
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
|
||||||
|
tableName,
|
||||||
|
columns,
|
||||||
|
placeholders,
|
||||||
|
)
|
||||||
|
|
||||||
|
// 执行异步插入
|
||||||
|
if err = c.asyncInsert(ctx, query, wait, values...); err != nil {
|
||||||
|
c.log.Errorf("async insert failed: %v", err)
|
||||||
return ErrAsyncInsertFailed
|
return ErrAsyncInsertFailed
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// asyncInsert 异步插入数据
|
||||||
|
func (c *Client) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
|
||||||
|
if c.conn == nil {
|
||||||
|
c.log.Error("clickhouse client is not initialized")
|
||||||
|
return ErrClientNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.conn.AsyncInsert(ctx, query, wait, args...); err != nil {
|
||||||
|
c.log.Errorf("async insert failed: %v", err)
|
||||||
|
return ErrAsyncInsertFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AsyncInsertMany 批量异步插入数据
|
||||||
|
func (c *Client) AsyncInsertMany(ctx context.Context, tableName string, data []any, wait bool) error {
|
||||||
|
if c.conn == nil {
|
||||||
|
c.log.Error("clickhouse client is not initialized")
|
||||||
|
return ErrClientNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(data) == 0 {
|
||||||
|
c.log.Error("data slice is empty")
|
||||||
|
return ErrInvalidColumnData
|
||||||
|
}
|
||||||
|
|
||||||
|
// 准备插入数据的列名和占位符
|
||||||
|
var columns string
|
||||||
|
var placeholders []string
|
||||||
|
var values []any
|
||||||
|
|
||||||
|
for _, item := range data {
|
||||||
|
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Errorf("prepare insert data failed: %v", err)
|
||||||
|
return ErrPrepareInsertDataFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
if columns == "" {
|
||||||
|
columns = itemColumns
|
||||||
|
} else if columns != itemColumns {
|
||||||
|
c.log.Error("data items have inconsistent columns")
|
||||||
|
return ErrInvalidColumnData
|
||||||
|
}
|
||||||
|
|
||||||
|
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
|
||||||
|
values = append(values, itemValues...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构造 SQL 语句
|
||||||
|
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s",
|
||||||
|
tableName,
|
||||||
|
columns,
|
||||||
|
strings.Join(placeholders, ", "),
|
||||||
|
)
|
||||||
|
|
||||||
|
// 执行异步插入操作
|
||||||
|
if err := c.asyncInsert(ctx, query, wait, values...); err != nil {
|
||||||
|
c.log.Errorf("batch insert failed: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// BatchInsert 批量插入数据
|
// BatchInsert 批量插入数据
|
||||||
func (c *Client) BatchInsert(ctx context.Context, query string, data [][]interface{}) error {
|
func (c *Client) BatchInsert(ctx context.Context, tableName string, data []any) error {
|
||||||
|
if c.conn == nil {
|
||||||
|
c.log.Error("clickhouse client is not initialized")
|
||||||
|
return ErrClientNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(data) == 0 {
|
||||||
|
c.log.Error("data slice is empty")
|
||||||
|
return ErrInvalidColumnData
|
||||||
|
}
|
||||||
|
|
||||||
|
// 准备插入数据的列名和占位符
|
||||||
|
var columns string
|
||||||
|
var values [][]any
|
||||||
|
|
||||||
|
for _, item := range data {
|
||||||
|
itemColumns, _, itemValues, err := c.prepareInsertData(item)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Errorf("prepare insert data failed: %v", err)
|
||||||
|
return ErrPrepareInsertDataFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
if columns == "" {
|
||||||
|
columns = itemColumns
|
||||||
|
} else if columns != itemColumns {
|
||||||
|
c.log.Error("data items have inconsistent columns")
|
||||||
|
return ErrInvalidColumnData
|
||||||
|
}
|
||||||
|
|
||||||
|
values = append(values, itemValues)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构造 SQL 语句
|
||||||
|
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES", tableName, columns)
|
||||||
|
|
||||||
|
// 调用 batchExec 方法执行批量插入
|
||||||
|
if err := c.batchExec(ctx, query, values); err != nil {
|
||||||
|
c.log.Errorf("batch insert failed: %v", err)
|
||||||
|
return ErrBatchInsertFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// batchExec 执行批量操作
|
||||||
|
func (c *Client) batchExec(ctx context.Context, query string, data [][]any) error {
|
||||||
batch, err := c.conn.PrepareBatch(ctx, query)
|
batch, err := c.conn.PrepareBatch(ctx, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Errorf("failed to prepare batch: %v", err)
|
c.log.Errorf("failed to prepare batch: %v", err)
|
||||||
@@ -339,8 +586,8 @@ func (c *Client) BatchInsert(ctx context.Context, query string, data [][]interfa
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, row := range data {
|
for _, row := range data {
|
||||||
if err := batch.Append(row...); err != nil {
|
if err = batch.Append(row...); err != nil {
|
||||||
c.log.Errorf("failed to append data: %v", err)
|
c.log.Errorf("failed to append batch data: %v", err)
|
||||||
return ErrBatchAppendFailed
|
return ErrBatchAppendFailed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -352,3 +599,34 @@ func (c *Client) BatchInsert(ctx context.Context, query string, data [][]interfa
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BatchStructs 批量插入结构体数据
|
||||||
|
func (c *Client) BatchStructs(ctx context.Context, query string, data []any) error {
|
||||||
|
if c.conn == nil {
|
||||||
|
c.log.Error("clickhouse client is not initialized")
|
||||||
|
return ErrClientNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
// 准备批量插入
|
||||||
|
batch, err := c.conn.PrepareBatch(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Errorf("failed to prepare batch: %v", err)
|
||||||
|
return ErrBatchPrepareFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
// 遍历数据并添加到批量插入
|
||||||
|
for _, row := range data {
|
||||||
|
if err := batch.AppendStruct(row); err != nil {
|
||||||
|
c.log.Errorf("failed to append batch struct data: %v", err)
|
||||||
|
return ErrBatchAppendFailed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送批量插入
|
||||||
|
if err = batch.Send(); err != nil {
|
||||||
|
c.log.Errorf("failed to send batch: %v", err)
|
||||||
|
return ErrBatchSendFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -11,28 +11,25 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Candle struct {
|
type Candle struct {
|
||||||
Symbol string `json:"symbol" ch:"symbol"`
|
Timestamp *time.Time `json:"timestamp" ch:"timestamp"`
|
||||||
Open float64 `json:"open" ch:"open"`
|
Symbol *string `json:"symbol" ch:"symbol"`
|
||||||
High float64 `json:"high" ch:"high"`
|
Open *float64 `json:"open" ch:"open"`
|
||||||
Low float64 `json:"low" ch:"low"`
|
High *float64 `json:"high" ch:"high"`
|
||||||
Close float64 `json:"close" ch:"close"`
|
Low *float64 `json:"low" ch:"low"`
|
||||||
Volume float64 `json:"volume" ch:"volume"`
|
Close *float64 `json:"close" ch:"close"`
|
||||||
Timestamp time.Time `json:"timestamp" ch:"timestamp"`
|
Volume *float64 `json:"volume" ch:"volume"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTestClient() *Client {
|
func createTestClient() *Client {
|
||||||
database := "finances"
|
|
||||||
username := "default"
|
|
||||||
password := "*Abcd123456"
|
|
||||||
cli, _ := NewClient(
|
cli, _ := NewClient(
|
||||||
log.DefaultLogger,
|
log.DefaultLogger,
|
||||||
&conf.Bootstrap{
|
&conf.Bootstrap{
|
||||||
Data: &conf.Data{
|
Data: &conf.Data{
|
||||||
Clickhouse: &conf.Data_ClickHouse{
|
Clickhouse: &conf.Data_ClickHouse{
|
||||||
Addresses: []string{"localhost:9000"},
|
Addresses: []string{"localhost:9000"},
|
||||||
Database: &database,
|
Database: Ptr("finances"),
|
||||||
Username: &username,
|
Username: Ptr("default"),
|
||||||
Password: &password,
|
Password: Ptr("*Abcd123456"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -44,7 +41,7 @@ func createCandlesTable(client *Client) {
|
|||||||
// 创建表的 SQL 语句
|
// 创建表的 SQL 语句
|
||||||
createTableQuery := `
|
createTableQuery := `
|
||||||
CREATE TABLE IF NOT EXISTS candles (
|
CREATE TABLE IF NOT EXISTS candles (
|
||||||
timestamp DateTime,
|
timestamp DateTime64(3),
|
||||||
symbol String,
|
symbol String,
|
||||||
open Float64,
|
open Float64,
|
||||||
high Float64,
|
high Float64,
|
||||||
@@ -76,32 +73,148 @@ func TestNewClient(t *testing.T) {
|
|||||||
createCandlesTable(client)
|
createCandlesTable(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAsyncInsert(t *testing.T) {
|
func TestInsertCandlesTable(t *testing.T) {
|
||||||
client := createTestClient()
|
client := createTestClient()
|
||||||
assert.NotNil(t, client)
|
assert.NotNil(t, client)
|
||||||
|
|
||||||
// 测试异步插入
|
createCandlesTable(client)
|
||||||
err := client.AsyncInsert(context.Background(), "INSERT INTO test_table (id, name) VALUES (?, ?)", true, 1, "example")
|
|
||||||
assert.NoError(t, err, "AsyncInsert 应该成功执行")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchInsert(t *testing.T) {
|
|
||||||
client := createTestClient()
|
|
||||||
assert.NotNil(t, client)
|
|
||||||
|
|
||||||
// 测试数据
|
// 测试数据
|
||||||
data := [][]interface{}{
|
candle := &Candle{
|
||||||
{1, "example1"},
|
Timestamp: Ptr(time.Now()),
|
||||||
{2, "example2"},
|
Symbol: Ptr("AAPL"),
|
||||||
{3, "example3"},
|
Open: Ptr(100.5),
|
||||||
|
High: Ptr(105.0),
|
||||||
|
Low: Ptr(99.5),
|
||||||
|
Close: Ptr(102.0),
|
||||||
|
Volume: Ptr(1500.0),
|
||||||
}
|
}
|
||||||
|
|
||||||
// 测试批量插入
|
// 插入数据
|
||||||
err := client.BatchInsert(context.Background(), "INSERT INTO test_table (id, name) VALUES (?, ?)", data)
|
err := client.Insert(context.Background(), "candles", candle)
|
||||||
assert.NoError(t, err, "BatchInsert 应该成功执行")
|
assert.NoError(t, err, "InsertCandlesTable 应该成功执行")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInsertIntoCandlesTable(t *testing.T) {
|
func TestInsertManyCandlesTable(t *testing.T) {
|
||||||
|
client := createTestClient()
|
||||||
|
assert.NotNil(t, client)
|
||||||
|
|
||||||
|
createCandlesTable(client)
|
||||||
|
|
||||||
|
// 测试数据
|
||||||
|
data := []any{
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("AAPL"),
|
||||||
|
Open: Ptr(100.5),
|
||||||
|
High: Ptr(105.0),
|
||||||
|
Low: Ptr(99.5),
|
||||||
|
Close: Ptr(102.0),
|
||||||
|
Volume: Ptr(1500.0),
|
||||||
|
},
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("GOOG"),
|
||||||
|
Open: Ptr(200.5),
|
||||||
|
High: Ptr(205.0),
|
||||||
|
Low: Ptr(199.5),
|
||||||
|
Close: Ptr(202.0),
|
||||||
|
Volume: Ptr(2500.0),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 插入数据
|
||||||
|
err := client.InsertMany(context.Background(), "candles", data)
|
||||||
|
assert.NoError(t, err, "InsertManyCandlesTable 应该成功执行")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAsyncInsertCandlesTable(t *testing.T) {
|
||||||
|
client := createTestClient()
|
||||||
|
assert.NotNil(t, client)
|
||||||
|
|
||||||
|
createCandlesTable(client)
|
||||||
|
|
||||||
|
// 测试数据
|
||||||
|
candle := &Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("BTC/USD"),
|
||||||
|
Open: Ptr(30000.0),
|
||||||
|
High: Ptr(31000.0),
|
||||||
|
Low: Ptr(29000.0),
|
||||||
|
Close: Ptr(30500.0),
|
||||||
|
Volume: Ptr(500.0),
|
||||||
|
}
|
||||||
|
|
||||||
|
// 异步插入数据
|
||||||
|
err := client.AsyncInsert(context.Background(), "candles", candle, true)
|
||||||
|
assert.NoError(t, err, "AsyncInsert 方法应该成功执行")
|
||||||
|
|
||||||
|
// 验证插入结果
|
||||||
|
query := `
|
||||||
|
SELECT timestamp, symbol, open, high, low, close, volume
|
||||||
|
FROM candles
|
||||||
|
WHERE symbol = ?
|
||||||
|
`
|
||||||
|
var result Candle
|
||||||
|
err = client.QueryRow(context.Background(), &result, query, "BTC/USD")
|
||||||
|
assert.NoError(t, err, "QueryRow 应该成功执行")
|
||||||
|
assert.Equal(t, "BTC/USD", *result.Symbol, "symbol 列值应该为 BTC/USD")
|
||||||
|
assert.Equal(t, 30500.0, *result.Close, "close 列值应该为 30500.0")
|
||||||
|
assert.Equal(t, 500.0, *result.Volume, "volume 列值应该为 500.0")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAsyncInsertManyCandlesTable(t *testing.T) {
|
||||||
|
client := createTestClient()
|
||||||
|
assert.NotNil(t, client)
|
||||||
|
|
||||||
|
createCandlesTable(client)
|
||||||
|
|
||||||
|
// 测试数据
|
||||||
|
data := []any{
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("AAPL"),
|
||||||
|
Open: Ptr(100.5),
|
||||||
|
High: Ptr(105.0),
|
||||||
|
Low: Ptr(99.5),
|
||||||
|
Close: Ptr(102.0),
|
||||||
|
Volume: Ptr(1500.0),
|
||||||
|
},
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("GOOG"),
|
||||||
|
Open: Ptr(200.5),
|
||||||
|
High: Ptr(205.0),
|
||||||
|
Low: Ptr(199.5),
|
||||||
|
Close: Ptr(202.0),
|
||||||
|
Volume: Ptr(2500.0),
|
||||||
|
},
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("MSFT"),
|
||||||
|
Open: Ptr(300.5),
|
||||||
|
High: Ptr(305.0),
|
||||||
|
Low: Ptr(299.5),
|
||||||
|
Close: Ptr(302.0),
|
||||||
|
Volume: Ptr(3500.0),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量插入数据
|
||||||
|
err := client.AsyncInsertMany(context.Background(), "candles", data, true)
|
||||||
|
assert.NoError(t, err, "AsyncInsertMany 方法应该成功执行")
|
||||||
|
|
||||||
|
// 验证插入结果
|
||||||
|
query := `
|
||||||
|
SELECT timestamp, symbol, open, high, low, close, volume
|
||||||
|
FROM candles
|
||||||
|
`
|
||||||
|
var results []Candle
|
||||||
|
err = client.Select(context.Background(), &results, query)
|
||||||
|
assert.NoError(t, err, "查询数据应该成功执行")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInternalBatchExecCandlesTable(t *testing.T) {
|
||||||
client := createTestClient()
|
client := createTestClient()
|
||||||
assert.NotNil(t, client)
|
assert.NotNil(t, client)
|
||||||
|
|
||||||
@@ -114,7 +227,121 @@ func TestInsertIntoCandlesTable(t *testing.T) {
|
|||||||
`
|
`
|
||||||
|
|
||||||
// 测试数据
|
// 测试数据
|
||||||
err := client.AsyncInsert(context.Background(), insertQuery, true,
|
data := [][]interface{}{
|
||||||
|
{"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0},
|
||||||
|
{"2023-10-01 12:01:00", "GOOG", 200.5, 205.0, 199.5, 202.0, 2500.0},
|
||||||
|
{"2023-10-01 12:02:00", "MSFT", 300.5, 305.0, 299.5, 302.0, 3500.0},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量插入数据
|
||||||
|
err := client.batchExec(context.Background(), insertQuery, data)
|
||||||
|
assert.NoError(t, err, "batchExec 应该成功执行")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchInsertCandlesTable(t *testing.T) {
|
||||||
|
client := createTestClient()
|
||||||
|
assert.NotNil(t, client)
|
||||||
|
|
||||||
|
createCandlesTable(client)
|
||||||
|
|
||||||
|
// 测试数据
|
||||||
|
data := []any{
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("AAPL"),
|
||||||
|
Open: Ptr(100.5),
|
||||||
|
High: Ptr(105.0),
|
||||||
|
Low: Ptr(99.5),
|
||||||
|
Close: Ptr(102.0),
|
||||||
|
Volume: Ptr(1500.0),
|
||||||
|
},
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("GOOG"),
|
||||||
|
Open: Ptr(200.5),
|
||||||
|
High: Ptr(205.0),
|
||||||
|
Low: Ptr(199.5),
|
||||||
|
Close: Ptr(202.0),
|
||||||
|
Volume: Ptr(2500.0),
|
||||||
|
},
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("MSFT"),
|
||||||
|
Open: Ptr(300.5),
|
||||||
|
High: Ptr(305.0),
|
||||||
|
Low: Ptr(299.5),
|
||||||
|
Close: Ptr(302.0),
|
||||||
|
Volume: Ptr(3500.0),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量插入数据
|
||||||
|
err := client.BatchInsert(context.Background(), "candles", data)
|
||||||
|
assert.NoError(t, err, "BatchInsert 方法应该成功执行")
|
||||||
|
|
||||||
|
// 验证插入结果
|
||||||
|
query := `
|
||||||
|
SELECT timestamp, symbol, open, high, low, close, volume
|
||||||
|
FROM candles
|
||||||
|
`
|
||||||
|
var results []Candle
|
||||||
|
err = client.Select(context.Background(), &results, query)
|
||||||
|
assert.NoError(t, err, "查询数据应该成功执行")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchStructsCandlesTable(t *testing.T) {
|
||||||
|
client := createTestClient()
|
||||||
|
assert.NotNil(t, client)
|
||||||
|
|
||||||
|
createCandlesTable(client)
|
||||||
|
|
||||||
|
// 插入数据的 SQL 语句
|
||||||
|
insertQuery := `
|
||||||
|
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
|
`
|
||||||
|
|
||||||
|
// 测试数据
|
||||||
|
data := []any{
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("AAPL"),
|
||||||
|
Open: Ptr(100.5),
|
||||||
|
High: Ptr(105.0),
|
||||||
|
Low: Ptr(99.5),
|
||||||
|
Close: Ptr(102.0),
|
||||||
|
Volume: Ptr(1500.0),
|
||||||
|
},
|
||||||
|
&Candle{
|
||||||
|
Timestamp: Ptr(time.Now()),
|
||||||
|
Symbol: Ptr("GOOG"),
|
||||||
|
Open: Ptr(200.5),
|
||||||
|
High: Ptr(205.0),
|
||||||
|
Low: Ptr(199.5),
|
||||||
|
Close: Ptr(202.0),
|
||||||
|
Volume: Ptr(2500.0),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量插入数据
|
||||||
|
err := client.BatchStructs(context.Background(), insertQuery, data)
|
||||||
|
assert.NoError(t, err, "BatchStructsCandlesTable 应该成功执行")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInternalAsyncInsertIntoCandlesTable(t *testing.T) {
|
||||||
|
client := createTestClient()
|
||||||
|
assert.NotNil(t, client)
|
||||||
|
|
||||||
|
createCandlesTable(client)
|
||||||
|
|
||||||
|
// 插入数据的 SQL 语句
|
||||||
|
insertQuery := `
|
||||||
|
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
|
`
|
||||||
|
|
||||||
|
// 测试数据
|
||||||
|
err := client.asyncInsert(context.Background(), insertQuery, true,
|
||||||
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
|
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
|
||||||
assert.NoError(t, err, "InsertIntoCandlesTable 应该成功执行")
|
assert.NoError(t, err, "InsertIntoCandlesTable 应该成功执行")
|
||||||
}
|
}
|
||||||
@@ -138,6 +365,22 @@ func TestQueryCandlesTable(t *testing.T) {
|
|||||||
err := client.Query(context.Background(), func() interface{} { return &Candle{} }, &results, query)
|
err := client.Query(context.Background(), func() interface{} { return &Candle{} }, &results, query)
|
||||||
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
|
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
|
||||||
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
|
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
|
||||||
|
for _, result := range results {
|
||||||
|
candle, ok := result.(*Candle)
|
||||||
|
assert.True(t, ok, "结果应该是 Candle 类型")
|
||||||
|
assert.NotNil(t, candle.Timestamp, "Timestamp 列不应该为 nil")
|
||||||
|
assert.NotNil(t, candle.Symbol, "Symbol 列不应该为 nil")
|
||||||
|
assert.NotNil(t, candle.Open, "Open 列不应该为 nil")
|
||||||
|
assert.NotNil(t, candle.High, "High 列不应该为 nil")
|
||||||
|
assert.NotNil(t, candle.Low, "Low 列不应该为 nil")
|
||||||
|
assert.NotNil(t, candle.Close, "Close 列不应该为 nil")
|
||||||
|
assert.NotNil(t, candle.Volume, "Volume 列不应该为 nil")
|
||||||
|
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||||
|
candle.Timestamp.String(),
|
||||||
|
*candle.Symbol,
|
||||||
|
*candle.Open, *candle.High, *candle.Low, *candle.Close, *candle.Volume,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSelectCandlesTable(t *testing.T) {
|
func TestSelectCandlesTable(t *testing.T) {
|
||||||
@@ -159,6 +402,21 @@ func TestSelectCandlesTable(t *testing.T) {
|
|||||||
err := client.Select(context.Background(), &results, query)
|
err := client.Select(context.Background(), &results, query)
|
||||||
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
|
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
|
||||||
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
|
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
|
||||||
|
|
||||||
|
for _, result := range results {
|
||||||
|
assert.NotNil(t, result.Timestamp, "Timestamp 列不应该为 nil")
|
||||||
|
assert.NotNil(t, result.Symbol, "Symbol 列不应该为 nil")
|
||||||
|
assert.NotNil(t, result.Open, "Open 列不应该为 nil")
|
||||||
|
assert.NotNil(t, result.High, "High 列不应该为 nil")
|
||||||
|
assert.NotNil(t, result.Low, "Low 列不应该为 nil")
|
||||||
|
assert.NotNil(t, result.Close, "Close 列不应该为 nil")
|
||||||
|
assert.NotNil(t, result.Volume, "Volume 列不应该为 nil")
|
||||||
|
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||||
|
result.Timestamp.String(),
|
||||||
|
*result.Symbol,
|
||||||
|
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQueryRow(t *testing.T) {
|
func TestQueryRow(t *testing.T) {
|
||||||
@@ -172,7 +430,7 @@ func TestQueryRow(t *testing.T) {
|
|||||||
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
|
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
`
|
`
|
||||||
err := client.AsyncInsert(context.Background(), insertQuery, true,
|
err := client.asyncInsert(context.Background(), insertQuery, true,
|
||||||
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
|
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
|
||||||
assert.NoError(t, err, "数据插入失败")
|
assert.NoError(t, err, "数据插入失败")
|
||||||
|
|
||||||
@@ -182,13 +440,19 @@ func TestQueryRow(t *testing.T) {
|
|||||||
FROM candles
|
FROM candles
|
||||||
WHERE symbol = ?
|
WHERE symbol = ?
|
||||||
`
|
`
|
||||||
|
|
||||||
var result Candle
|
var result Candle
|
||||||
|
|
||||||
err = client.QueryRow(context.Background(), &result, query, "AAPL")
|
err = client.QueryRow(context.Background(), &result, query, "AAPL")
|
||||||
assert.NoError(t, err, "QueryRow 应该成功执行")
|
assert.NoError(t, err, "QueryRow 应该成功执行")
|
||||||
assert.Equal(t, "AAPL", result.Symbol, "symbol 列值应该为 AAPL")
|
assert.Equal(t, "AAPL", *result.Symbol, "symbol 列值应该为 AAPL")
|
||||||
assert.Equal(t, 100.5, result.Open, "open 列值应该为 100.5")
|
assert.Equal(t, 100.5, *result.Open, "open 列值应该为 100.5")
|
||||||
assert.Equal(t, 1500.0, result.Volume, "volume 列值应该为 1500.0")
|
assert.Equal(t, 1500.0, *result.Volume, "volume 列值应该为 1500.0")
|
||||||
|
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||||
|
result.Timestamp.String(),
|
||||||
|
*result.Symbol,
|
||||||
|
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDropCandlesTable(t *testing.T) {
|
func TestDropCandlesTable(t *testing.T) {
|
||||||
@@ -234,23 +498,3 @@ func TestAggregateCandlesTable(t *testing.T) {
|
|||||||
assert.NoError(t, err, "AggregateCandlesTable 应该成功执行")
|
assert.NoError(t, err, "AggregateCandlesTable 应该成功执行")
|
||||||
assert.NotEmpty(t, results, "AggregateCandlesTable 应该返回结果")
|
assert.NotEmpty(t, results, "AggregateCandlesTable 应该返回结果")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBatchInsertCandlesTable(t *testing.T) {
|
|
||||||
client := createTestClient()
|
|
||||||
assert.NotNil(t, client)
|
|
||||||
|
|
||||||
createCandlesTable(client)
|
|
||||||
|
|
||||||
// 测试数据
|
|
||||||
data := [][]interface{}{
|
|
||||||
{"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0},
|
|
||||||
{"2023-10-01 12:01:00", "GOOG", 200.5, 205.0, 199.5, 202.0, 2500.0},
|
|
||||||
{"2023-10-01 12:02:00", "MSFT", 300.5, 305.0, 299.5, 302.0, 3500.0},
|
|
||||||
}
|
|
||||||
|
|
||||||
// 批量插入数据
|
|
||||||
err := client.BatchInsert(context.Background(), `
|
|
||||||
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?)`, data)
|
|
||||||
assert.NoError(t, err, "BatchInsertCandlesTable 应该成功执行")
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -72,9 +72,18 @@ var (
|
|||||||
// ErrBatchAppendFailed is returned when appending to a batch fails.
|
// ErrBatchAppendFailed is returned when appending to a batch fails.
|
||||||
ErrBatchAppendFailed = errors.InternalServer("BATCH_APPEND_FAILED", "batch append operation failed")
|
ErrBatchAppendFailed = errors.InternalServer("BATCH_APPEND_FAILED", "batch append operation failed")
|
||||||
|
|
||||||
|
// ErrBatchInsertFailed is returned when a batch insert operation fails.
|
||||||
|
ErrBatchInsertFailed = errors.InternalServer("BATCH_INSERT_FAILED", "batch insert operation failed")
|
||||||
|
|
||||||
// ErrInvalidDSN is returned when the data source name (DSN) is invalid.
|
// ErrInvalidDSN is returned when the data source name (DSN) is invalid.
|
||||||
ErrInvalidDSN = errors.InternalServer("INVALID_DSN", "invalid data source name")
|
ErrInvalidDSN = errors.InternalServer("INVALID_DSN", "invalid data source name")
|
||||||
|
|
||||||
// ErrInvalidProxyURL is returned when the proxy URL is invalid.
|
// ErrInvalidProxyURL is returned when the proxy URL is invalid.
|
||||||
ErrInvalidProxyURL = errors.InternalServer("INVALID_PROXY_URL", "invalid proxy URL")
|
ErrInvalidProxyURL = errors.InternalServer("INVALID_PROXY_URL", "invalid proxy URL")
|
||||||
|
|
||||||
|
// ErrPrepareInsertDataFailed is returned when preparing insert data fails.
|
||||||
|
ErrPrepareInsertDataFailed = errors.InternalServer("PREPARE_INSERT_DATA_FAILED", "failed to prepare insert data")
|
||||||
|
|
||||||
|
// ErrInvalidColumnData is returned when the column data type is invalid.
|
||||||
|
ErrInvalidColumnData = errors.InternalServer("INVALID_COLUMN_DATA", "invalid column data type")
|
||||||
)
|
)
|
||||||
|
|||||||
146
database/clickhouse/utils.go
Normal file
146
database/clickhouse/utils.go
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
package clickhouse
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/protobuf/types/known/durationpb"
|
||||||
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
timeFormat = "2006-01-02 15:04:05.000000000"
|
||||||
|
)
|
||||||
|
|
||||||
|
func structToValueArray(input any) []any {
|
||||||
|
// 检查是否是指针类型,如果是则解引用
|
||||||
|
val := reflect.ValueOf(input)
|
||||||
|
if val.Kind() == reflect.Ptr {
|
||||||
|
val = val.Elem()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 确保输入是结构体
|
||||||
|
if val.Kind() != reflect.Struct {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var values []any
|
||||||
|
for i := 0; i < val.NumField(); i++ {
|
||||||
|
value := val.Field(i).Interface()
|
||||||
|
|
||||||
|
switch v := value.(type) {
|
||||||
|
case *sql.NullString:
|
||||||
|
if v.Valid {
|
||||||
|
values = append(values, v.String)
|
||||||
|
} else {
|
||||||
|
values = append(values, nil)
|
||||||
|
}
|
||||||
|
case *sql.NullInt64:
|
||||||
|
if v.Valid {
|
||||||
|
values = append(values, v.Int64)
|
||||||
|
} else {
|
||||||
|
values = append(values, nil)
|
||||||
|
}
|
||||||
|
case *sql.NullFloat64:
|
||||||
|
if v.Valid {
|
||||||
|
values = append(values, v.Float64)
|
||||||
|
} else {
|
||||||
|
values = append(values, nil)
|
||||||
|
}
|
||||||
|
case *sql.NullBool:
|
||||||
|
if v.Valid {
|
||||||
|
values = append(values, v.Bool)
|
||||||
|
} else {
|
||||||
|
values = append(values, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
case *sql.NullTime:
|
||||||
|
if v != nil && v.Valid {
|
||||||
|
values = append(values, v.Time.Format(timeFormat))
|
||||||
|
} else {
|
||||||
|
values = append(values, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
case *time.Time:
|
||||||
|
if v != nil {
|
||||||
|
values = append(values, v.Format(timeFormat))
|
||||||
|
} else {
|
||||||
|
values = append(values, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
case time.Time:
|
||||||
|
// 处理 time.Time 类型
|
||||||
|
if !v.IsZero() {
|
||||||
|
values = append(values, v.Format(timeFormat))
|
||||||
|
} else {
|
||||||
|
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||||
|
}
|
||||||
|
|
||||||
|
case timestamppb.Timestamp:
|
||||||
|
// 处理 timestamppb.Timestamp 类型
|
||||||
|
if !v.IsValid() {
|
||||||
|
values = append(values, v.AsTime().Format(timeFormat))
|
||||||
|
} else {
|
||||||
|
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||||
|
}
|
||||||
|
|
||||||
|
case *timestamppb.Timestamp:
|
||||||
|
// 处理 *timestamppb.Timestamp 类型
|
||||||
|
if v != nil && v.IsValid() {
|
||||||
|
values = append(values, v.AsTime().Format(timeFormat))
|
||||||
|
} else {
|
||||||
|
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||||
|
}
|
||||||
|
|
||||||
|
case durationpb.Duration:
|
||||||
|
// 处理 timestamppb.Duration 类型
|
||||||
|
if v.AsDuration() != 0 {
|
||||||
|
values = append(values, v.AsDuration().String())
|
||||||
|
} else {
|
||||||
|
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||||
|
}
|
||||||
|
|
||||||
|
case *durationpb.Duration:
|
||||||
|
// 处理 *timestamppb.Duration 类型
|
||||||
|
if v != nil && v.AsDuration() != 0 {
|
||||||
|
values = append(values, v.AsDuration().String())
|
||||||
|
} else {
|
||||||
|
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||||
|
}
|
||||||
|
|
||||||
|
case []any:
|
||||||
|
// 处理切片类型
|
||||||
|
if len(v) > 0 {
|
||||||
|
for _, item := range v {
|
||||||
|
if item == nil {
|
||||||
|
values = append(values, nil)
|
||||||
|
} else {
|
||||||
|
values = append(values, item)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
values = append(values, nil) // 如果切片为空,插入 NULL
|
||||||
|
}
|
||||||
|
|
||||||
|
case [][]any:
|
||||||
|
// 处理二维切片类型
|
||||||
|
if len(v) > 0 {
|
||||||
|
for _, item := range v {
|
||||||
|
if len(item) > 0 {
|
||||||
|
values = append(values, item)
|
||||||
|
} else {
|
||||||
|
values = append(values, nil) // 如果子切片为空,插入 NULL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
values = append(values, nil) // 如果二维切片为空,插入 NULL
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
values = append(values, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return values
|
||||||
|
}
|
||||||
83
database/clickhouse/utils_test.go
Normal file
83
database/clickhouse/utils_test.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package clickhouse
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Ptr returns a pointer to the provided value.
|
||||||
|
func Ptr[T any](v T) *T {
|
||||||
|
return &v
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructToValueArrayWithCandle(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
candle := Candle{
|
||||||
|
Timestamp: Ptr(now),
|
||||||
|
Symbol: Ptr("AAPL"),
|
||||||
|
Open: Ptr(100.5),
|
||||||
|
High: Ptr(105.0),
|
||||||
|
Low: Ptr(99.5),
|
||||||
|
Close: Ptr(102.0),
|
||||||
|
Volume: Ptr(1500.0),
|
||||||
|
}
|
||||||
|
|
||||||
|
values := structToValueArray(candle)
|
||||||
|
assert.NotNil(t, values, "Values should not be nil")
|
||||||
|
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
|
||||||
|
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
|
||||||
|
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
|
||||||
|
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
|
||||||
|
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
|
||||||
|
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
|
||||||
|
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
|
||||||
|
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
|
||||||
|
|
||||||
|
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||||
|
values[0],
|
||||||
|
*(values[1].(*string)),
|
||||||
|
*values[2].(*float64),
|
||||||
|
*values[3].(*float64),
|
||||||
|
*values[4].(*float64),
|
||||||
|
*values[5].(*float64),
|
||||||
|
*values[6].(*float64),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructToValueArrayWithCandlePtr(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
candle := &Candle{
|
||||||
|
Timestamp: Ptr(now),
|
||||||
|
Symbol: Ptr("AAPL"),
|
||||||
|
Open: Ptr(100.5),
|
||||||
|
High: Ptr(105.0),
|
||||||
|
Low: Ptr(99.5),
|
||||||
|
Close: Ptr(102.0),
|
||||||
|
Volume: Ptr(1500.0),
|
||||||
|
}
|
||||||
|
|
||||||
|
values := structToValueArray(candle)
|
||||||
|
assert.NotNil(t, values, "Values should not be nil")
|
||||||
|
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
|
||||||
|
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
|
||||||
|
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
|
||||||
|
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
|
||||||
|
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
|
||||||
|
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
|
||||||
|
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
|
||||||
|
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
|
||||||
|
|
||||||
|
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||||
|
values[0],
|
||||||
|
*(values[1].(*string)),
|
||||||
|
*values[2].(*float64),
|
||||||
|
*values[3].(*float64),
|
||||||
|
*values[4].(*float64),
|
||||||
|
*values[5].(*float64),
|
||||||
|
*values[6].(*float64),
|
||||||
|
)
|
||||||
|
}
|
||||||
@@ -14,3 +14,19 @@
|
|||||||
- 动态映射(dynamic mapping)
|
- 动态映射(dynamic mapping)
|
||||||
- 显式映射(explicit mapping)
|
- 显式映射(explicit mapping)
|
||||||
- 严格映射(strict mappings)
|
- 严格映射(strict mappings)
|
||||||
|
|
||||||
|
## Docker部署
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker pull bitnami/elasticsearch:latest
|
||||||
|
|
||||||
|
docker run -itd \
|
||||||
|
--name elasticsearch \
|
||||||
|
-p 9200:9200 \
|
||||||
|
-p 9300:9300 \
|
||||||
|
-e ELASTICSEARCH_USERNAME=elastic \
|
||||||
|
-e ELASTICSEARCH_PASSWORD=elastic \
|
||||||
|
-e ELASTICSEARCH_NODE_NAME=elasticsearch-node-1 \
|
||||||
|
-e ELASTICSEARCH_CLUSTER_NAME=elasticsearch-cluster \
|
||||||
|
bitnami/elasticsearch:latest
|
||||||
|
```
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ func NewLogger(cfg *conf.Logger) log.Logger {
|
|||||||
fallthrough
|
fallthrough
|
||||||
case "text":
|
case "text":
|
||||||
loggerFormatter = &logrus.TextFormatter{
|
loggerFormatter = &logrus.TextFormatter{
|
||||||
|
ForceColors: cfg.Logrus.ForceColors,
|
||||||
DisableColors: cfg.Logrus.DisableColors,
|
DisableColors: cfg.Logrus.DisableColors,
|
||||||
DisableTimestamp: cfg.Logrus.DisableTimestamp,
|
DisableTimestamp: cfg.Logrus.DisableTimestamp,
|
||||||
TimestampFormat: cfg.Logrus.TimestampFormat,
|
TimestampFormat: cfg.Logrus.TimestampFormat,
|
||||||
|
|||||||
2
tag.bat
2
tag.bat
@@ -12,7 +12,7 @@ git tag database/ent/v0.0.10 --force
|
|||||||
git tag database/gorm/v0.0.10 --force
|
git tag database/gorm/v0.0.10 --force
|
||||||
git tag database/mongodb/v0.0.12 --force
|
git tag database/mongodb/v0.0.12 --force
|
||||||
git tag database/influxdb/v0.0.12 --force
|
git tag database/influxdb/v0.0.12 --force
|
||||||
git tag database/clickhouse/v0.0.12 --force
|
git tag database/clickhouse/v0.0.14 --force
|
||||||
git tag database/elasticsearch/v0.0.12 --force
|
git tag database/elasticsearch/v0.0.12 --force
|
||||||
git tag database/cassandra/v0.0.10 --force
|
git tag database/cassandra/v0.0.10 --force
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user