Compare commits

..

3 Commits

Author SHA1 Message Date
c2726db8a1 logrus添加支持彩虹日志的配置字段force_colors 2025-07-17 03:18:31 +08:00
Bobo
47c72651db feat: database. 2025-06-29 18:36:43 +08:00
Bobo
f267c19c73 feat: database. 2025-06-29 14:17:03 +08:00
10 changed files with 551 additions and 97 deletions

View File

@@ -191,6 +191,7 @@ type Logger_Logrus struct {
TimestampFormat string `protobuf:"bytes,3,opt,name=timestamp_format,json=timestampFormat,proto3" json:"timestamp_format,omitempty"` // 定义时间戳格式,例如:"2006-01-02 15:04:05" TimestampFormat string `protobuf:"bytes,3,opt,name=timestamp_format,json=timestampFormat,proto3" json:"timestamp_format,omitempty"` // 定义时间戳格式,例如:"2006-01-02 15:04:05"
DisableColors bool `protobuf:"varint,4,opt,name=disable_colors,json=disableColors,proto3" json:"disable_colors,omitempty"` // 不需要彩色日志 DisableColors bool `protobuf:"varint,4,opt,name=disable_colors,json=disableColors,proto3" json:"disable_colors,omitempty"` // 不需要彩色日志
DisableTimestamp bool `protobuf:"varint,5,opt,name=disable_timestamp,json=disableTimestamp,proto3" json:"disable_timestamp,omitempty"` // 不需要时间戳 DisableTimestamp bool `protobuf:"varint,5,opt,name=disable_timestamp,json=disableTimestamp,proto3" json:"disable_timestamp,omitempty"` // 不需要时间戳
ForceColors bool `protobuf:"varint,6,opt,name=force_colors,json=forceColors,proto3" json:"force_colors,omitempty"` // 是否开启彩色日志
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@@ -260,6 +261,13 @@ func (x *Logger_Logrus) GetDisableTimestamp() bool {
return false return false
} }
func (x *Logger_Logrus) GetForceColors() bool {
if x != nil {
return x.ForceColors
}
return false
}
// Fluent // Fluent
type Logger_Fluent struct { type Logger_Fluent struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
@@ -447,7 +455,7 @@ var File_conf_v1_kratos_conf_logger_proto protoreflect.FileDescriptor
const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" + const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
"\n" + "\n" +
" conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xc4\a\n" + " conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xe7\a\n" +
"\x06Logger\x12\x12\n" + "\x06Logger\x12\x12\n" +
"\x04type\x18\x01 \x01(\tR\x04type\x12'\n" + "\x04type\x18\x01 \x01(\tR\x04type\x12'\n" +
"\x03zap\x18\x02 \x01(\v2\x10.conf.Logger.ZapH\x00R\x03zap\x88\x01\x01\x120\n" + "\x03zap\x18\x02 \x01(\v2\x10.conf.Logger.ZapH\x00R\x03zap\x88\x01\x01\x120\n" +
@@ -461,13 +469,14 @@ const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
"\bmax_size\x18\x03 \x01(\x05R\amaxSize\x12\x17\n" + "\bmax_size\x18\x03 \x01(\x05R\amaxSize\x12\x17\n" +
"\amax_age\x18\x04 \x01(\x05R\x06maxAge\x12\x1f\n" + "\amax_age\x18\x04 \x01(\x05R\x06maxAge\x12\x1f\n" +
"\vmax_backups\x18\x05 \x01(\x05R\n" + "\vmax_backups\x18\x05 \x01(\x05R\n" +
"maxBackups\x1a\xbb\x01\n" + "maxBackups\x1a\xde\x01\n" +
"\x06Logrus\x12\x14\n" + "\x06Logrus\x12\x14\n" +
"\x05level\x18\x01 \x01(\tR\x05level\x12\x1c\n" + "\x05level\x18\x01 \x01(\tR\x05level\x12\x1c\n" +
"\tformatter\x18\x02 \x01(\tR\tformatter\x12)\n" + "\tformatter\x18\x02 \x01(\tR\tformatter\x12)\n" +
"\x10timestamp_format\x18\x03 \x01(\tR\x0ftimestampFormat\x12%\n" + "\x10timestamp_format\x18\x03 \x01(\tR\x0ftimestampFormat\x12%\n" +
"\x0edisable_colors\x18\x04 \x01(\bR\rdisableColors\x12+\n" + "\x0edisable_colors\x18\x04 \x01(\bR\rdisableColors\x12+\n" +
"\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x1a$\n" + "\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x12!\n" +
"\fforce_colors\x18\x06 \x01(\bR\vforceColors\x1a$\n" +
"\x06Fluent\x12\x1a\n" + "\x06Fluent\x12\x1a\n" +
"\bendpoint\x18\x01 \x01(\tR\bendpoint\x1a\x82\x01\n" + "\bendpoint\x18\x01 \x01(\tR\bendpoint\x1a\x82\x01\n" +
"\x06Aliyun\x12\x1a\n" + "\x06Aliyun\x12\x1a\n" +

View File

@@ -22,6 +22,7 @@ message Logger {
string timestamp_format = 3; // 定义时间戳格式,例如:"2006-01-02 15:04:05" string timestamp_format = 3; // 定义时间戳格式,例如:"2006-01-02 15:04:05"
bool disable_colors = 4; // 不需要彩色日志 bool disable_colors = 4; // 不需要彩色日志
bool disable_timestamp = 5; // 不需要时间戳 bool disable_timestamp = 5; // 不需要时间戳
bool force_colors = 6; // 是否开启彩色日志
} }
// Fluent // Fluent

View File

@@ -167,14 +167,15 @@ func appendStructToBatch(batch driverV2.Batch, obj interface{}, columns []string
field := t.Field(j) field := t.Field(j)
// 检查ch标签 // 检查ch标签
if tag := field.Tag.Get("ch"); tag == col { if tag := field.Tag.Get("ch"); strings.TrimSpace(tag) == col {
values[i] = v.Field(j).Interface() values[i] = v.Field(j).Interface()
found = true found = true
break break
} }
// 检查json标签 // 检查json标签
if tag := field.Tag.Get("json"); tag == col { jsonTags := strings.Split(field.Tag.Get("json"), ",")
if len(jsonTags) > 0 && strings.TrimSpace(jsonTags[0]) == col {
values[i] = v.Field(j).Interface() values[i] = v.Field(j).Interface()
found = true found = true
break break

View File

@@ -8,13 +8,11 @@ import (
"net/url" "net/url"
"reflect" "reflect"
"strings" "strings"
"time"
clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2" clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2"
driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver" driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1" conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
"github.com/tx7do/kratos-bootstrap/utils" "github.com/tx7do/kratos-bootstrap/utils"
) )
@@ -328,92 +326,48 @@ func (c *Client) prepareInsertData(data any) (string, string, []any, error) {
val = val.Elem() val = val.Elem()
typ := val.Type() typ := val.Type()
var columns []string columns := make([]string, 0, typ.NumField())
var placeholders []string placeholders := make([]string, 0, typ.NumField())
var values []any values := make([]any, 0, typ.NumField())
values = structToValueArray(data)
for i := 0; i < typ.NumField(); i++ { for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i) field := typ.Field(i)
value := val.Field(i).Interface()
// 优先获取 `cn` 标签,其次获取 `json` 标签,最后使用字段名 // 优先获取 `ch` 标签,其次获取 `json` 标签,最后使用字段名
columnName := field.Tag.Get("cn") columnName := field.Tag.Get("ch")
if columnName == "" { if columnName == "" {
columnName = field.Tag.Get("json") jsonTag := field.Tag.Get("json")
if jsonTag != "" {
tags := strings.Split(jsonTag, ",") // 只取逗号前的部分
if len(tags) > 0 {
columnName = tags[0]
}
}
} }
if columnName == "" { if columnName == "" {
columnName = field.Name columnName = field.Name
} }
//columnName = strings.TrimSpace(columnName)
columns = append(columns, columnName) columns = append(columns, columnName)
placeholders = append(placeholders, "?") placeholders = append(placeholders, "?")
switch v := value.(type) {
case *sql.NullString:
if v.Valid {
values = append(values, v.String)
} else {
values = append(values, nil)
}
case *sql.NullInt64:
if v.Valid {
values = append(values, v.Int64)
} else {
values = append(values, nil)
}
case *sql.NullFloat64:
if v.Valid {
values = append(values, v.Float64)
} else {
values = append(values, nil)
}
case *sql.NullBool:
if v.Valid {
values = append(values, v.Bool)
} else {
values = append(values, nil)
}
case *sql.NullTime:
if v != nil && v.Valid {
values = append(values, v.Time.Format("2006-01-02 15:04:05.000000000"))
} else {
values = append(values, nil)
}
case *time.Time:
if v != nil {
values = append(values, v.Format("2006-01-02 15:04:05.000000000"))
} else {
values = append(values, nil)
}
case time.Time:
// 处理 time.Time 类型
if !v.IsZero() {
values = append(values, v.Format("2006-01-02 15:04:05.000000000"))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
default:
values = append(values, v)
}
} }
return strings.Join(columns, ", "), strings.Join(placeholders, ", "), values, nil return strings.Join(columns, ", "), strings.Join(placeholders, ", "), values, nil
} }
// Insert 插入数据到指定表 // Insert 插入数据到指定表
func (c *Client) Insert(ctx context.Context, tableName string, data any) error { func (c *Client) Insert(ctx context.Context, tableName string, in any) error {
if c.conn == nil { if c.conn == nil {
c.log.Error("clickhouse client is not initialized") c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized return ErrClientNotInitialized
} }
columns, placeholders, values, err := c.prepareInsertData(data) columns, placeholders, values, err := c.prepareInsertData(in)
if err != nil { if err != nil {
c.log.Errorf("prepare insert data failed: %v", err) c.log.Errorf("prepare insert in failed: %v", err)
return ErrPrepareInsertDataFailed return ErrPrepareInsertDataFailed
} }
@@ -425,7 +379,7 @@ func (c *Client) Insert(ctx context.Context, tableName string, data any) error {
) )
// 执行插入操作 // 执行插入操作
if err := c.conn.Exec(ctx, query, values...); err != nil { if err = c.conn.Exec(ctx, query, values...); err != nil {
c.log.Errorf("insert failed: %v", err) c.log.Errorf("insert failed: %v", err)
return ErrInsertFailed return ErrInsertFailed
} }
@@ -467,7 +421,7 @@ func (c *Client) InsertMany(ctx context.Context, tableName string, data []any) e
} }
// 构造 SQL 语句 // 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s", query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
tableName, tableName,
columns, columns,
strings.Join(placeholders, ", "), strings.Join(placeholders, ", "),
@@ -483,7 +437,37 @@ func (c *Client) InsertMany(ctx context.Context, tableName string, data []any) e
} }
// AsyncInsert 异步插入数据 // AsyncInsert 异步插入数据
func (c *Client) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error { func (c *Client) AsyncInsert(ctx context.Context, tableName string, data any, wait bool) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
// 准备插入数据
columns, placeholders, values, err := c.prepareInsertData(data)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
tableName,
columns,
placeholders,
)
// 执行异步插入
if err = c.asyncInsert(ctx, query, wait, values...); err != nil {
c.log.Errorf("async insert failed: %v", err)
return ErrAsyncInsertFailed
}
return nil
}
// asyncInsert 异步插入数据
func (c *Client) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
if c.conn == nil { if c.conn == nil {
c.log.Error("clickhouse client is not initialized") c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized return ErrClientNotInitialized
@@ -497,8 +481,104 @@ func (c *Client) AsyncInsert(ctx context.Context, query string, wait bool, args
return nil return nil
} }
// AsyncInsertMany 批量异步插入数据
func (c *Client) AsyncInsertMany(ctx context.Context, tableName string, data []any, wait bool) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if len(data) == 0 {
c.log.Error("data slice is empty")
return ErrInvalidColumnData
}
// 准备插入数据的列名和占位符
var columns string
var placeholders []string
var values []any
for _, item := range data {
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
if columns == "" {
columns = itemColumns
} else if columns != itemColumns {
c.log.Error("data items have inconsistent columns")
return ErrInvalidColumnData
}
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
values = append(values, itemValues...)
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s",
tableName,
columns,
strings.Join(placeholders, ", "),
)
// 执行异步插入操作
if err := c.asyncInsert(ctx, query, wait, values...); err != nil {
c.log.Errorf("batch insert failed: %v", err)
return err
}
return nil
}
// BatchInsert 批量插入数据 // BatchInsert 批量插入数据
func (c *Client) BatchInsert(ctx context.Context, query string, data [][]any) error { func (c *Client) BatchInsert(ctx context.Context, tableName string, data []any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if len(data) == 0 {
c.log.Error("data slice is empty")
return ErrInvalidColumnData
}
// 准备插入数据的列名和占位符
var columns string
var values [][]any
for _, item := range data {
itemColumns, _, itemValues, err := c.prepareInsertData(item)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
if columns == "" {
columns = itemColumns
} else if columns != itemColumns {
c.log.Error("data items have inconsistent columns")
return ErrInvalidColumnData
}
values = append(values, itemValues)
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES", tableName, columns)
// 调用 batchExec 方法执行批量插入
if err := c.batchExec(ctx, query, values); err != nil {
c.log.Errorf("batch insert failed: %v", err)
return ErrBatchInsertFailed
}
return nil
}
// batchExec 执行批量操作
func (c *Client) batchExec(ctx context.Context, query string, data [][]any) error {
batch, err := c.conn.PrepareBatch(ctx, query) batch, err := c.conn.PrepareBatch(ctx, query)
if err != nil { if err != nil {
c.log.Errorf("failed to prepare batch: %v", err) c.log.Errorf("failed to prepare batch: %v", err)
@@ -520,8 +600,8 @@ func (c *Client) BatchInsert(ctx context.Context, query string, data [][]any) er
return nil return nil
} }
// BatchInsertStructs 批量插入结构体数据 // BatchStructs 批量插入结构体数据
func (c *Client) BatchInsertStructs(ctx context.Context, query string, data []any) error { func (c *Client) BatchStructs(ctx context.Context, query string, data []any) error {
if c.conn == nil { if c.conn == nil {
c.log.Error("clickhouse client is not initialized") c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized return ErrClientNotInitialized

View File

@@ -10,10 +10,6 @@ import (
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1" conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
) )
func Ptr[T any](v T) *T {
return &v
}
type Candle struct { type Candle struct {
Timestamp *time.Time `json:"timestamp" ch:"timestamp"` Timestamp *time.Time `json:"timestamp" ch:"timestamp"`
Symbol *string `json:"symbol" ch:"symbol"` Symbol *string `json:"symbol" ch:"symbol"`
@@ -84,9 +80,8 @@ func TestInsertCandlesTable(t *testing.T) {
createCandlesTable(client) createCandlesTable(client)
// 测试数据 // 测试数据
now := time.Now()
candle := &Candle{ candle := &Candle{
Timestamp: &now, Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"), Symbol: Ptr("AAPL"),
Open: Ptr(100.5), Open: Ptr(100.5),
High: Ptr(105.0), High: Ptr(105.0),
@@ -107,10 +102,9 @@ func TestInsertManyCandlesTable(t *testing.T) {
createCandlesTable(client) createCandlesTable(client)
// 测试数据 // 测试数据
now := time.Now()
data := []any{ data := []any{
&Candle{ &Candle{
Timestamp: &now, Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"), Symbol: Ptr("AAPL"),
Open: Ptr(100.5), Open: Ptr(100.5),
High: Ptr(105.0), High: Ptr(105.0),
@@ -119,7 +113,7 @@ func TestInsertManyCandlesTable(t *testing.T) {
Volume: Ptr(1500.0), Volume: Ptr(1500.0),
}, },
&Candle{ &Candle{
Timestamp: &now, Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"), Symbol: Ptr("GOOG"),
Open: Ptr(200.5), Open: Ptr(200.5),
High: Ptr(205.0), High: Ptr(205.0),
@@ -134,7 +128,93 @@ func TestInsertManyCandlesTable(t *testing.T) {
assert.NoError(t, err, "InsertManyCandlesTable 应该成功执行") assert.NoError(t, err, "InsertManyCandlesTable 应该成功执行")
} }
func TestBatchInsertCandlesTable(t *testing.T) { func TestAsyncInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
candle := &Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("BTC/USD"),
Open: Ptr(30000.0),
High: Ptr(31000.0),
Low: Ptr(29000.0),
Close: Ptr(30500.0),
Volume: Ptr(500.0),
}
// 异步插入数据
err := client.AsyncInsert(context.Background(), "candles", candle, true)
assert.NoError(t, err, "AsyncInsert 方法应该成功执行")
// 验证插入结果
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
WHERE symbol = ?
`
var result Candle
err = client.QueryRow(context.Background(), &result, query, "BTC/USD")
assert.NoError(t, err, "QueryRow 应该成功执行")
assert.Equal(t, "BTC/USD", *result.Symbol, "symbol 列值应该为 BTC/USD")
assert.Equal(t, 30500.0, *result.Close, "close 列值应该为 30500.0")
assert.Equal(t, 500.0, *result.Volume, "volume 列值应该为 500.0")
}
func TestAsyncInsertManyCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("MSFT"),
Open: Ptr(300.5),
High: Ptr(305.0),
Low: Ptr(299.5),
Close: Ptr(302.0),
Volume: Ptr(3500.0),
},
}
// 批量插入数据
err := client.AsyncInsertMany(context.Background(), "candles", data, true)
assert.NoError(t, err, "AsyncInsertMany 方法应该成功执行")
// 验证插入结果
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
`
var results []Candle
err = client.Select(context.Background(), &results, query)
assert.NoError(t, err, "查询数据应该成功执行")
}
func TestInternalBatchExecCandlesTable(t *testing.T) {
client := createTestClient() client := createTestClient()
assert.NotNil(t, client) assert.NotNil(t, client)
@@ -154,11 +234,62 @@ func TestBatchInsertCandlesTable(t *testing.T) {
} }
// 批量插入数据 // 批量插入数据
err := client.BatchInsert(context.Background(), insertQuery, data) err := client.batchExec(context.Background(), insertQuery, data)
assert.NoError(t, err, "BatchInsertCandlesTable 应该成功执行") assert.NoError(t, err, "batchExec 应该成功执行")
} }
func TestBatchInsertStructsCandlesTable(t *testing.T) { func TestBatchInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("MSFT"),
Open: Ptr(300.5),
High: Ptr(305.0),
Low: Ptr(299.5),
Close: Ptr(302.0),
Volume: Ptr(3500.0),
},
}
// 批量插入数据
err := client.BatchInsert(context.Background(), "candles", data)
assert.NoError(t, err, "BatchInsert 方法应该成功执行")
// 验证插入结果
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
`
var results []Candle
err = client.Select(context.Background(), &results, query)
assert.NoError(t, err, "查询数据应该成功执行")
}
func TestBatchStructsCandlesTable(t *testing.T) {
client := createTestClient() client := createTestClient()
assert.NotNil(t, client) assert.NotNil(t, client)
@@ -171,10 +302,9 @@ func TestBatchInsertStructsCandlesTable(t *testing.T) {
` `
// 测试数据 // 测试数据
now := time.Now()
data := []any{ data := []any{
&Candle{ &Candle{
Timestamp: &now, Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"), Symbol: Ptr("AAPL"),
Open: Ptr(100.5), Open: Ptr(100.5),
High: Ptr(105.0), High: Ptr(105.0),
@@ -183,7 +313,7 @@ func TestBatchInsertStructsCandlesTable(t *testing.T) {
Volume: Ptr(1500.0), Volume: Ptr(1500.0),
}, },
&Candle{ &Candle{
Timestamp: &now, Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"), Symbol: Ptr("GOOG"),
Open: Ptr(200.5), Open: Ptr(200.5),
High: Ptr(205.0), High: Ptr(205.0),
@@ -194,11 +324,11 @@ func TestBatchInsertStructsCandlesTable(t *testing.T) {
} }
// 批量插入数据 // 批量插入数据
err := client.BatchInsertStructs(context.Background(), insertQuery, data) err := client.BatchStructs(context.Background(), insertQuery, data)
assert.NoError(t, err, "BatchInsertStructsCandlesTable 应该成功执行") assert.NoError(t, err, "BatchStructsCandlesTable 应该成功执行")
} }
func TestAsyncInsertIntoCandlesTable(t *testing.T) { func TestInternalAsyncInsertIntoCandlesTable(t *testing.T) {
client := createTestClient() client := createTestClient()
assert.NotNil(t, client) assert.NotNil(t, client)
@@ -211,7 +341,7 @@ func TestAsyncInsertIntoCandlesTable(t *testing.T) {
` `
// 测试数据 // 测试数据
err := client.AsyncInsert(context.Background(), insertQuery, true, err := client.asyncInsert(context.Background(), insertQuery, true,
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0) "2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
assert.NoError(t, err, "InsertIntoCandlesTable 应该成功执行") assert.NoError(t, err, "InsertIntoCandlesTable 应该成功执行")
} }
@@ -300,7 +430,7 @@ func TestQueryRow(t *testing.T) {
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume) INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?)
` `
err := client.AsyncInsert(context.Background(), insertQuery, true, err := client.asyncInsert(context.Background(), insertQuery, true,
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0) "2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
assert.NoError(t, err, "数据插入失败") assert.NoError(t, err, "数据插入失败")

View File

@@ -72,6 +72,9 @@ var (
// ErrBatchAppendFailed is returned when appending to a batch fails. // ErrBatchAppendFailed is returned when appending to a batch fails.
ErrBatchAppendFailed = errors.InternalServer("BATCH_APPEND_FAILED", "batch append operation failed") ErrBatchAppendFailed = errors.InternalServer("BATCH_APPEND_FAILED", "batch append operation failed")
// ErrBatchInsertFailed is returned when a batch insert operation fails.
ErrBatchInsertFailed = errors.InternalServer("BATCH_INSERT_FAILED", "batch insert operation failed")
// ErrInvalidDSN is returned when the data source name (DSN) is invalid. // ErrInvalidDSN is returned when the data source name (DSN) is invalid.
ErrInvalidDSN = errors.InternalServer("INVALID_DSN", "invalid data source name") ErrInvalidDSN = errors.InternalServer("INVALID_DSN", "invalid data source name")

View File

@@ -0,0 +1,146 @@
package clickhouse
import (
"database/sql"
"reflect"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
timeFormat = "2006-01-02 15:04:05.000000000"
)
func structToValueArray(input any) []any {
// 检查是否是指针类型,如果是则解引用
val := reflect.ValueOf(input)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
// 确保输入是结构体
if val.Kind() != reflect.Struct {
return nil
}
var values []any
for i := 0; i < val.NumField(); i++ {
value := val.Field(i).Interface()
switch v := value.(type) {
case *sql.NullString:
if v.Valid {
values = append(values, v.String)
} else {
values = append(values, nil)
}
case *sql.NullInt64:
if v.Valid {
values = append(values, v.Int64)
} else {
values = append(values, nil)
}
case *sql.NullFloat64:
if v.Valid {
values = append(values, v.Float64)
} else {
values = append(values, nil)
}
case *sql.NullBool:
if v.Valid {
values = append(values, v.Bool)
} else {
values = append(values, nil)
}
case *sql.NullTime:
if v != nil && v.Valid {
values = append(values, v.Time.Format(timeFormat))
} else {
values = append(values, nil)
}
case *time.Time:
if v != nil {
values = append(values, v.Format(timeFormat))
} else {
values = append(values, nil)
}
case time.Time:
// 处理 time.Time 类型
if !v.IsZero() {
values = append(values, v.Format(timeFormat))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case timestamppb.Timestamp:
// 处理 timestamppb.Timestamp 类型
if !v.IsValid() {
values = append(values, v.AsTime().Format(timeFormat))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case *timestamppb.Timestamp:
// 处理 *timestamppb.Timestamp 类型
if v != nil && v.IsValid() {
values = append(values, v.AsTime().Format(timeFormat))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case durationpb.Duration:
// 处理 timestamppb.Duration 类型
if v.AsDuration() != 0 {
values = append(values, v.AsDuration().String())
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case *durationpb.Duration:
// 处理 *timestamppb.Duration 类型
if v != nil && v.AsDuration() != 0 {
values = append(values, v.AsDuration().String())
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case []any:
// 处理切片类型
if len(v) > 0 {
for _, item := range v {
if item == nil {
values = append(values, nil)
} else {
values = append(values, item)
}
}
} else {
values = append(values, nil) // 如果切片为空,插入 NULL
}
case [][]any:
// 处理二维切片类型
if len(v) > 0 {
for _, item := range v {
if len(item) > 0 {
values = append(values, item)
} else {
values = append(values, nil) // 如果子切片为空,插入 NULL
}
}
} else {
values = append(values, nil) // 如果二维切片为空,插入 NULL
}
default:
values = append(values, v)
}
}
return values
}

View File

@@ -0,0 +1,83 @@
package clickhouse
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// Ptr returns a pointer to the provided value.
func Ptr[T any](v T) *T {
return &v
}
func TestStructToValueArrayWithCandle(t *testing.T) {
now := time.Now()
candle := Candle{
Timestamp: Ptr(now),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
}
values := structToValueArray(candle)
assert.NotNil(t, values, "Values should not be nil")
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
values[0],
*(values[1].(*string)),
*values[2].(*float64),
*values[3].(*float64),
*values[4].(*float64),
*values[5].(*float64),
*values[6].(*float64),
)
}
func TestStructToValueArrayWithCandlePtr(t *testing.T) {
now := time.Now()
candle := &Candle{
Timestamp: Ptr(now),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
}
values := structToValueArray(candle)
assert.NotNil(t, values, "Values should not be nil")
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
values[0],
*(values[1].(*string)),
*values[2].(*float64),
*values[3].(*float64),
*values[4].(*float64),
*values[5].(*float64),
*values[6].(*float64),
)
}

View File

@@ -26,6 +26,7 @@ func NewLogger(cfg *conf.Logger) log.Logger {
fallthrough fallthrough
case "text": case "text":
loggerFormatter = &logrus.TextFormatter{ loggerFormatter = &logrus.TextFormatter{
ForceColors: cfg.Logrus.ForceColors,
DisableColors: cfg.Logrus.DisableColors, DisableColors: cfg.Logrus.DisableColors,
DisableTimestamp: cfg.Logrus.DisableTimestamp, DisableTimestamp: cfg.Logrus.DisableTimestamp,
TimestampFormat: cfg.Logrus.TimestampFormat, TimestampFormat: cfg.Logrus.TimestampFormat,

View File

@@ -12,7 +12,7 @@ git tag database/ent/v0.0.10 --force
git tag database/gorm/v0.0.10 --force git tag database/gorm/v0.0.10 --force
git tag database/mongodb/v0.0.12 --force git tag database/mongodb/v0.0.12 --force
git tag database/influxdb/v0.0.12 --force git tag database/influxdb/v0.0.12 --force
git tag database/clickhouse/v0.0.13 --force git tag database/clickhouse/v0.0.14 --force
git tag database/elasticsearch/v0.0.12 --force git tag database/elasticsearch/v0.0.12 --force
git tag database/cassandra/v0.0.10 --force git tag database/cassandra/v0.0.10 --force