Compare commits

...

5 Commits

Author SHA1 Message Date
c2726db8a1 logrus添加支持彩虹日志的配置字段force_colors 2025-07-17 03:18:31 +08:00
Bobo
47c72651db feat: database. 2025-06-29 18:36:43 +08:00
Bobo
f267c19c73 feat: database. 2025-06-29 14:17:03 +08:00
Bobo
8c017a34e0 feat: database. 2025-06-29 11:12:16 +08:00
Bobo
ac6f0d1987 feat: database. 2025-06-29 09:41:13 +08:00
11 changed files with 862 additions and 74 deletions

View File

@@ -191,6 +191,7 @@ type Logger_Logrus struct {
TimestampFormat string `protobuf:"bytes,3,opt,name=timestamp_format,json=timestampFormat,proto3" json:"timestamp_format,omitempty"` // 定义时间戳格式,例如:"2006-01-02 15:04:05"
DisableColors bool `protobuf:"varint,4,opt,name=disable_colors,json=disableColors,proto3" json:"disable_colors,omitempty"` // 不需要彩色日志
DisableTimestamp bool `protobuf:"varint,5,opt,name=disable_timestamp,json=disableTimestamp,proto3" json:"disable_timestamp,omitempty"` // 不需要时间戳
ForceColors bool `protobuf:"varint,6,opt,name=force_colors,json=forceColors,proto3" json:"force_colors,omitempty"` // 是否开启彩色日志
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -260,6 +261,13 @@ func (x *Logger_Logrus) GetDisableTimestamp() bool {
return false
}
func (x *Logger_Logrus) GetForceColors() bool {
if x != nil {
return x.ForceColors
}
return false
}
// Fluent
type Logger_Fluent struct {
state protoimpl.MessageState `protogen:"open.v1"`
@@ -447,7 +455,7 @@ var File_conf_v1_kratos_conf_logger_proto protoreflect.FileDescriptor
const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
"\n" +
" conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xc4\a\n" +
" conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xe7\a\n" +
"\x06Logger\x12\x12\n" +
"\x04type\x18\x01 \x01(\tR\x04type\x12'\n" +
"\x03zap\x18\x02 \x01(\v2\x10.conf.Logger.ZapH\x00R\x03zap\x88\x01\x01\x120\n" +
@@ -461,13 +469,14 @@ const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
"\bmax_size\x18\x03 \x01(\x05R\amaxSize\x12\x17\n" +
"\amax_age\x18\x04 \x01(\x05R\x06maxAge\x12\x1f\n" +
"\vmax_backups\x18\x05 \x01(\x05R\n" +
"maxBackups\x1a\xbb\x01\n" +
"maxBackups\x1a\xde\x01\n" +
"\x06Logrus\x12\x14\n" +
"\x05level\x18\x01 \x01(\tR\x05level\x12\x1c\n" +
"\tformatter\x18\x02 \x01(\tR\tformatter\x12)\n" +
"\x10timestamp_format\x18\x03 \x01(\tR\x0ftimestampFormat\x12%\n" +
"\x0edisable_colors\x18\x04 \x01(\bR\rdisableColors\x12+\n" +
"\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x1a$\n" +
"\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x12!\n" +
"\fforce_colors\x18\x06 \x01(\bR\vforceColors\x1a$\n" +
"\x06Fluent\x12\x1a\n" +
"\bendpoint\x18\x01 \x01(\tR\bendpoint\x1a\x82\x01\n" +
"\x06Aliyun\x12\x1a\n" +

View File

@@ -22,6 +22,7 @@ message Logger {
string timestamp_format = 3; // 定义时间戳格式,例如:"2006-01-02 15:04:05"
bool disable_colors = 4; // 不需要彩色日志
bool disable_timestamp = 5; // 不需要时间戳
bool force_colors = 6; // 是否开启彩色日志
}
// Fluent

View File

@@ -167,14 +167,15 @@ func appendStructToBatch(batch driverV2.Batch, obj interface{}, columns []string
field := t.Field(j)
// 检查ch标签
if tag := field.Tag.Get("ch"); tag == col {
if tag := field.Tag.Get("ch"); strings.TrimSpace(tag) == col {
values[i] = v.Field(j).Interface()
found = true
break
}
// 检查json标签
if tag := field.Tag.Get("json"); tag == col {
jsonTags := strings.Split(field.Tag.Get("json"), ",")
if len(jsonTags) > 0 && strings.TrimSpace(jsonTags[0]) == col {
values[i] = v.Field(j).Interface()
found = true
break

View File

@@ -4,13 +4,15 @@ import (
"context"
"crypto/tls"
"database/sql"
"fmt"
"net/url"
"reflect"
"strings"
clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2"
driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/go-kratos/kratos/v2/log"
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
"github.com/tx7do/kratos-bootstrap/utils"
)
@@ -229,7 +231,7 @@ func (c *Client) CheckConnection(ctx context.Context) error {
}
// Query 执行查询并返回结果
func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, query string, args ...interface{}) error {
func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, query string, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
@@ -269,7 +271,7 @@ func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, que
}
// QueryRow 执行查询并返回单行结果
func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...interface{}) error {
func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...any) error {
row := c.conn.QueryRow(ctx, query, args...)
if row == nil {
c.log.Error("query row returned nil")
@@ -285,7 +287,7 @@ func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...i
}
// Select 封装 SELECT 子句
func (c *Client) Select(ctx context.Context, dest any, query string, args ...interface{}) error {
func (c *Client) Select(ctx context.Context, dest any, query string, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
@@ -301,7 +303,7 @@ func (c *Client) Select(ctx context.Context, dest any, query string, args ...int
}
// Exec 执行非查询语句
func (c *Client) Exec(ctx context.Context, query string, args ...interface{}) error {
func (c *Client) Exec(ctx context.Context, query string, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
@@ -315,23 +317,268 @@ func (c *Client) Exec(ctx context.Context, query string, args ...interface{}) er
return nil
}
// AsyncInsert 异步插入数据
func (c *Client) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
func (c *Client) prepareInsertData(data any) (string, string, []any, error) {
val := reflect.ValueOf(data)
if val.Kind() != reflect.Ptr || val.IsNil() {
return "", "", nil, fmt.Errorf("data must be a non-nil pointer")
}
val = val.Elem()
typ := val.Type()
columns := make([]string, 0, typ.NumField())
placeholders := make([]string, 0, typ.NumField())
values := make([]any, 0, typ.NumField())
values = structToValueArray(data)
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
// 优先获取 `ch` 标签,其次获取 `json` 标签,最后使用字段名
columnName := field.Tag.Get("ch")
if columnName == "" {
jsonTag := field.Tag.Get("json")
if jsonTag != "" {
tags := strings.Split(jsonTag, ",") // 只取逗号前的部分
if len(tags) > 0 {
columnName = tags[0]
}
}
}
if columnName == "" {
columnName = field.Name
}
//columnName = strings.TrimSpace(columnName)
columns = append(columns, columnName)
placeholders = append(placeholders, "?")
}
return strings.Join(columns, ", "), strings.Join(placeholders, ", "), values, nil
}
// Insert 插入数据到指定表
func (c *Client) Insert(ctx context.Context, tableName string, in any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if err := c.conn.AsyncInsert(ctx, query, wait, args...); err != nil {
c.log.Errorf("exec failed: %v", err)
columns, placeholders, values, err := c.prepareInsertData(in)
if err != nil {
c.log.Errorf("prepare insert in failed: %v", err)
return ErrPrepareInsertDataFailed
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
tableName,
columns,
placeholders,
)
// 执行插入操作
if err = c.conn.Exec(ctx, query, values...); err != nil {
c.log.Errorf("insert failed: %v", err)
return ErrInsertFailed
}
return nil
}
func (c *Client) InsertMany(ctx context.Context, tableName string, data []any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if len(data) == 0 {
c.log.Error("data slice is empty")
return ErrInvalidColumnData
}
var columns string
var placeholders []string
var values []any
for _, item := range data {
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
if columns == "" {
columns = itemColumns
} else if columns != itemColumns {
c.log.Error("data items have inconsistent columns")
return ErrInvalidColumnData
}
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
values = append(values, itemValues...)
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
tableName,
columns,
strings.Join(placeholders, ", "),
)
// 执行插入操作
if err := c.conn.Exec(ctx, query, values...); err != nil {
c.log.Errorf("insert many failed: %v", err)
return ErrInsertFailed
}
return nil
}
// AsyncInsert 异步插入数据
func (c *Client) AsyncInsert(ctx context.Context, tableName string, data any, wait bool) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
// 准备插入数据
columns, placeholders, values, err := c.prepareInsertData(data)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
tableName,
columns,
placeholders,
)
// 执行异步插入
if err = c.asyncInsert(ctx, query, wait, values...); err != nil {
c.log.Errorf("async insert failed: %v", err)
return ErrAsyncInsertFailed
}
return nil
}
// asyncInsert 异步插入数据
func (c *Client) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if err := c.conn.AsyncInsert(ctx, query, wait, args...); err != nil {
c.log.Errorf("async insert failed: %v", err)
return ErrAsyncInsertFailed
}
return nil
}
// AsyncInsertMany 批量异步插入数据
func (c *Client) AsyncInsertMany(ctx context.Context, tableName string, data []any, wait bool) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if len(data) == 0 {
c.log.Error("data slice is empty")
return ErrInvalidColumnData
}
// 准备插入数据的列名和占位符
var columns string
var placeholders []string
var values []any
for _, item := range data {
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
if columns == "" {
columns = itemColumns
} else if columns != itemColumns {
c.log.Error("data items have inconsistent columns")
return ErrInvalidColumnData
}
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
values = append(values, itemValues...)
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s",
tableName,
columns,
strings.Join(placeholders, ", "),
)
// 执行异步插入操作
if err := c.asyncInsert(ctx, query, wait, values...); err != nil {
c.log.Errorf("batch insert failed: %v", err)
return err
}
return nil
}
// BatchInsert 批量插入数据
func (c *Client) BatchInsert(ctx context.Context, query string, data [][]interface{}) error {
func (c *Client) BatchInsert(ctx context.Context, tableName string, data []any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if len(data) == 0 {
c.log.Error("data slice is empty")
return ErrInvalidColumnData
}
// 准备插入数据的列名和占位符
var columns string
var values [][]any
for _, item := range data {
itemColumns, _, itemValues, err := c.prepareInsertData(item)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
if columns == "" {
columns = itemColumns
} else if columns != itemColumns {
c.log.Error("data items have inconsistent columns")
return ErrInvalidColumnData
}
values = append(values, itemValues)
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES", tableName, columns)
// 调用 batchExec 方法执行批量插入
if err := c.batchExec(ctx, query, values); err != nil {
c.log.Errorf("batch insert failed: %v", err)
return ErrBatchInsertFailed
}
return nil
}
// batchExec 执行批量操作
func (c *Client) batchExec(ctx context.Context, query string, data [][]any) error {
batch, err := c.conn.PrepareBatch(ctx, query)
if err != nil {
c.log.Errorf("failed to prepare batch: %v", err)
@@ -339,8 +586,8 @@ func (c *Client) BatchInsert(ctx context.Context, query string, data [][]interfa
}
for _, row := range data {
if err := batch.Append(row...); err != nil {
c.log.Errorf("failed to append data: %v", err)
if err = batch.Append(row...); err != nil {
c.log.Errorf("failed to append batch data: %v", err)
return ErrBatchAppendFailed
}
}
@@ -352,3 +599,34 @@ func (c *Client) BatchInsert(ctx context.Context, query string, data [][]interfa
return nil
}
// BatchStructs 批量插入结构体数据
func (c *Client) BatchStructs(ctx context.Context, query string, data []any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
// 准备批量插入
batch, err := c.conn.PrepareBatch(ctx, query)
if err != nil {
c.log.Errorf("failed to prepare batch: %v", err)
return ErrBatchPrepareFailed
}
// 遍历数据并添加到批量插入
for _, row := range data {
if err := batch.AppendStruct(row); err != nil {
c.log.Errorf("failed to append batch struct data: %v", err)
return ErrBatchAppendFailed
}
}
// 发送批量插入
if err = batch.Send(); err != nil {
c.log.Errorf("failed to send batch: %v", err)
return ErrBatchSendFailed
}
return nil
}

View File

@@ -11,28 +11,25 @@ import (
)
type Candle struct {
Symbol string `json:"symbol" ch:"symbol"`
Open float64 `json:"open" ch:"open"`
High float64 `json:"high" ch:"high"`
Low float64 `json:"low" ch:"low"`
Close float64 `json:"close" ch:"close"`
Volume float64 `json:"volume" ch:"volume"`
Timestamp time.Time `json:"timestamp" ch:"timestamp"`
Timestamp *time.Time `json:"timestamp" ch:"timestamp"`
Symbol *string `json:"symbol" ch:"symbol"`
Open *float64 `json:"open" ch:"open"`
High *float64 `json:"high" ch:"high"`
Low *float64 `json:"low" ch:"low"`
Close *float64 `json:"close" ch:"close"`
Volume *float64 `json:"volume" ch:"volume"`
}
func createTestClient() *Client {
database := "finances"
username := "default"
password := "*Abcd123456"
cli, _ := NewClient(
log.DefaultLogger,
&conf.Bootstrap{
Data: &conf.Data{
Clickhouse: &conf.Data_ClickHouse{
Addresses: []string{"localhost:9000"},
Database: &database,
Username: &username,
Password: &password,
Database: Ptr("finances"),
Username: Ptr("default"),
Password: Ptr("*Abcd123456"),
},
},
},
@@ -44,7 +41,7 @@ func createCandlesTable(client *Client) {
// 创建表的 SQL 语句
createTableQuery := `
CREATE TABLE IF NOT EXISTS candles (
timestamp DateTime,
timestamp DateTime64(3),
symbol String,
open Float64,
high Float64,
@@ -76,32 +73,148 @@ func TestNewClient(t *testing.T) {
createCandlesTable(client)
}
func TestAsyncInsert(t *testing.T) {
func TestInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
// 测试异步插入
err := client.AsyncInsert(context.Background(), "INSERT INTO test_table (id, name) VALUES (?, ?)", true, 1, "example")
assert.NoError(t, err, "AsyncInsert 应该成功执行")
}
func TestBatchInsert(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := [][]interface{}{
{1, "example1"},
{2, "example2"},
{3, "example3"},
candle := &Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
}
// 测试批量插入
err := client.BatchInsert(context.Background(), "INSERT INTO test_table (id, name) VALUES (?, ?)", data)
assert.NoError(t, err, "BatchInsert 应该成功执行")
// 插入数据
err := client.Insert(context.Background(), "candles", candle)
assert.NoError(t, err, "InsertCandlesTable 应该成功执行")
}
func TestInsertIntoCandlesTable(t *testing.T) {
func TestInsertManyCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
}
// 插入数据
err := client.InsertMany(context.Background(), "candles", data)
assert.NoError(t, err, "InsertManyCandlesTable 应该成功执行")
}
func TestAsyncInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
candle := &Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("BTC/USD"),
Open: Ptr(30000.0),
High: Ptr(31000.0),
Low: Ptr(29000.0),
Close: Ptr(30500.0),
Volume: Ptr(500.0),
}
// 异步插入数据
err := client.AsyncInsert(context.Background(), "candles", candle, true)
assert.NoError(t, err, "AsyncInsert 方法应该成功执行")
// 验证插入结果
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
WHERE symbol = ?
`
var result Candle
err = client.QueryRow(context.Background(), &result, query, "BTC/USD")
assert.NoError(t, err, "QueryRow 应该成功执行")
assert.Equal(t, "BTC/USD", *result.Symbol, "symbol 列值应该为 BTC/USD")
assert.Equal(t, 30500.0, *result.Close, "close 列值应该为 30500.0")
assert.Equal(t, 500.0, *result.Volume, "volume 列值应该为 500.0")
}
func TestAsyncInsertManyCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("MSFT"),
Open: Ptr(300.5),
High: Ptr(305.0),
Low: Ptr(299.5),
Close: Ptr(302.0),
Volume: Ptr(3500.0),
},
}
// 批量插入数据
err := client.AsyncInsertMany(context.Background(), "candles", data, true)
assert.NoError(t, err, "AsyncInsertMany 方法应该成功执行")
// 验证插入结果
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
`
var results []Candle
err = client.Select(context.Background(), &results, query)
assert.NoError(t, err, "查询数据应该成功执行")
}
func TestInternalBatchExecCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
@@ -114,7 +227,121 @@ func TestInsertIntoCandlesTable(t *testing.T) {
`
// 测试数据
err := client.AsyncInsert(context.Background(), insertQuery, true,
data := [][]interface{}{
{"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0},
{"2023-10-01 12:01:00", "GOOG", 200.5, 205.0, 199.5, 202.0, 2500.0},
{"2023-10-01 12:02:00", "MSFT", 300.5, 305.0, 299.5, 302.0, 3500.0},
}
// 批量插入数据
err := client.batchExec(context.Background(), insertQuery, data)
assert.NoError(t, err, "batchExec 应该成功执行")
}
func TestBatchInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("MSFT"),
Open: Ptr(300.5),
High: Ptr(305.0),
Low: Ptr(299.5),
Close: Ptr(302.0),
Volume: Ptr(3500.0),
},
}
// 批量插入数据
err := client.BatchInsert(context.Background(), "candles", data)
assert.NoError(t, err, "BatchInsert 方法应该成功执行")
// 验证插入结果
query := `
SELECT timestamp, symbol, open, high, low, close, volume
FROM candles
`
var results []Candle
err = client.Select(context.Background(), &results, query)
assert.NoError(t, err, "查询数据应该成功执行")
}
func TestBatchStructsCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 插入数据的 SQL 语句
insertQuery := `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
// 测试数据
data := []any{
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: Ptr(time.Now()),
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
}
// 批量插入数据
err := client.BatchStructs(context.Background(), insertQuery, data)
assert.NoError(t, err, "BatchStructsCandlesTable 应该成功执行")
}
func TestInternalAsyncInsertIntoCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 插入数据的 SQL 语句
insertQuery := `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
// 测试数据
err := client.asyncInsert(context.Background(), insertQuery, true,
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
assert.NoError(t, err, "InsertIntoCandlesTable 应该成功执行")
}
@@ -138,6 +365,22 @@ func TestQueryCandlesTable(t *testing.T) {
err := client.Query(context.Background(), func() interface{} { return &Candle{} }, &results, query)
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
for _, result := range results {
candle, ok := result.(*Candle)
assert.True(t, ok, "结果应该是 Candle 类型")
assert.NotNil(t, candle.Timestamp, "Timestamp 列不应该为 nil")
assert.NotNil(t, candle.Symbol, "Symbol 列不应该为 nil")
assert.NotNil(t, candle.Open, "Open 列不应该为 nil")
assert.NotNil(t, candle.High, "High 列不应该为 nil")
assert.NotNil(t, candle.Low, "Low 列不应该为 nil")
assert.NotNil(t, candle.Close, "Close 列不应该为 nil")
assert.NotNil(t, candle.Volume, "Volume 列不应该为 nil")
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
candle.Timestamp.String(),
*candle.Symbol,
*candle.Open, *candle.High, *candle.Low, *candle.Close, *candle.Volume,
)
}
}
func TestSelectCandlesTable(t *testing.T) {
@@ -159,6 +402,21 @@ func TestSelectCandlesTable(t *testing.T) {
err := client.Select(context.Background(), &results, query)
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
for _, result := range results {
assert.NotNil(t, result.Timestamp, "Timestamp 列不应该为 nil")
assert.NotNil(t, result.Symbol, "Symbol 列不应该为 nil")
assert.NotNil(t, result.Open, "Open 列不应该为 nil")
assert.NotNil(t, result.High, "High 列不应该为 nil")
assert.NotNil(t, result.Low, "Low 列不应该为 nil")
assert.NotNil(t, result.Close, "Close 列不应该为 nil")
assert.NotNil(t, result.Volume, "Volume 列不应该为 nil")
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
result.Timestamp.String(),
*result.Symbol,
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
)
}
}
func TestQueryRow(t *testing.T) {
@@ -172,7 +430,7 @@ func TestQueryRow(t *testing.T) {
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
err := client.AsyncInsert(context.Background(), insertQuery, true,
err := client.asyncInsert(context.Background(), insertQuery, true,
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
assert.NoError(t, err, "数据插入失败")
@@ -182,13 +440,19 @@ func TestQueryRow(t *testing.T) {
FROM candles
WHERE symbol = ?
`
var result Candle
err = client.QueryRow(context.Background(), &result, query, "AAPL")
assert.NoError(t, err, "QueryRow 应该成功执行")
assert.Equal(t, "AAPL", result.Symbol, "symbol 列值应该为 AAPL")
assert.Equal(t, 100.5, result.Open, "open 列值应该为 100.5")
assert.Equal(t, 1500.0, result.Volume, "volume 列值应该为 1500.0")
assert.Equal(t, "AAPL", *result.Symbol, "symbol 列值应该为 AAPL")
assert.Equal(t, 100.5, *result.Open, "open 列值应该为 100.5")
assert.Equal(t, 1500.0, *result.Volume, "volume 列值应该为 1500.0")
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
result.Timestamp.String(),
*result.Symbol,
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
)
}
func TestDropCandlesTable(t *testing.T) {
@@ -234,23 +498,3 @@ func TestAggregateCandlesTable(t *testing.T) {
assert.NoError(t, err, "AggregateCandlesTable 应该成功执行")
assert.NotEmpty(t, results, "AggregateCandlesTable 应该返回结果")
}
func TestBatchInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := [][]interface{}{
{"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0},
{"2023-10-01 12:01:00", "GOOG", 200.5, 205.0, 199.5, 202.0, 2500.0},
{"2023-10-01 12:02:00", "MSFT", 300.5, 305.0, 299.5, 302.0, 3500.0},
}
// 批量插入数据
err := client.BatchInsert(context.Background(), `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)`, data)
assert.NoError(t, err, "BatchInsertCandlesTable 应该成功执行")
}

View File

@@ -72,9 +72,18 @@ var (
// ErrBatchAppendFailed is returned when appending to a batch fails.
ErrBatchAppendFailed = errors.InternalServer("BATCH_APPEND_FAILED", "batch append operation failed")
// ErrBatchInsertFailed is returned when a batch insert operation fails.
ErrBatchInsertFailed = errors.InternalServer("BATCH_INSERT_FAILED", "batch insert operation failed")
// ErrInvalidDSN is returned when the data source name (DSN) is invalid.
ErrInvalidDSN = errors.InternalServer("INVALID_DSN", "invalid data source name")
// ErrInvalidProxyURL is returned when the proxy URL is invalid.
ErrInvalidProxyURL = errors.InternalServer("INVALID_PROXY_URL", "invalid proxy URL")
// ErrPrepareInsertDataFailed is returned when preparing insert data fails.
ErrPrepareInsertDataFailed = errors.InternalServer("PREPARE_INSERT_DATA_FAILED", "failed to prepare insert data")
// ErrInvalidColumnData is returned when the column data type is invalid.
ErrInvalidColumnData = errors.InternalServer("INVALID_COLUMN_DATA", "invalid column data type")
)

View File

@@ -0,0 +1,146 @@
package clickhouse
import (
"database/sql"
"reflect"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
timeFormat = "2006-01-02 15:04:05.000000000"
)
func structToValueArray(input any) []any {
// 检查是否是指针类型,如果是则解引用
val := reflect.ValueOf(input)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
// 确保输入是结构体
if val.Kind() != reflect.Struct {
return nil
}
var values []any
for i := 0; i < val.NumField(); i++ {
value := val.Field(i).Interface()
switch v := value.(type) {
case *sql.NullString:
if v.Valid {
values = append(values, v.String)
} else {
values = append(values, nil)
}
case *sql.NullInt64:
if v.Valid {
values = append(values, v.Int64)
} else {
values = append(values, nil)
}
case *sql.NullFloat64:
if v.Valid {
values = append(values, v.Float64)
} else {
values = append(values, nil)
}
case *sql.NullBool:
if v.Valid {
values = append(values, v.Bool)
} else {
values = append(values, nil)
}
case *sql.NullTime:
if v != nil && v.Valid {
values = append(values, v.Time.Format(timeFormat))
} else {
values = append(values, nil)
}
case *time.Time:
if v != nil {
values = append(values, v.Format(timeFormat))
} else {
values = append(values, nil)
}
case time.Time:
// 处理 time.Time 类型
if !v.IsZero() {
values = append(values, v.Format(timeFormat))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case timestamppb.Timestamp:
// 处理 timestamppb.Timestamp 类型
if !v.IsValid() {
values = append(values, v.AsTime().Format(timeFormat))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case *timestamppb.Timestamp:
// 处理 *timestamppb.Timestamp 类型
if v != nil && v.IsValid() {
values = append(values, v.AsTime().Format(timeFormat))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case durationpb.Duration:
// 处理 timestamppb.Duration 类型
if v.AsDuration() != 0 {
values = append(values, v.AsDuration().String())
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case *durationpb.Duration:
// 处理 *timestamppb.Duration 类型
if v != nil && v.AsDuration() != 0 {
values = append(values, v.AsDuration().String())
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
case []any:
// 处理切片类型
if len(v) > 0 {
for _, item := range v {
if item == nil {
values = append(values, nil)
} else {
values = append(values, item)
}
}
} else {
values = append(values, nil) // 如果切片为空,插入 NULL
}
case [][]any:
// 处理二维切片类型
if len(v) > 0 {
for _, item := range v {
if len(item) > 0 {
values = append(values, item)
} else {
values = append(values, nil) // 如果子切片为空,插入 NULL
}
}
} else {
values = append(values, nil) // 如果二维切片为空,插入 NULL
}
default:
values = append(values, v)
}
}
return values
}

View File

@@ -0,0 +1,83 @@
package clickhouse
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// Ptr returns a pointer to the provided value.
func Ptr[T any](v T) *T {
return &v
}
func TestStructToValueArrayWithCandle(t *testing.T) {
now := time.Now()
candle := Candle{
Timestamp: Ptr(now),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
}
values := structToValueArray(candle)
assert.NotNil(t, values, "Values should not be nil")
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
values[0],
*(values[1].(*string)),
*values[2].(*float64),
*values[3].(*float64),
*values[4].(*float64),
*values[5].(*float64),
*values[6].(*float64),
)
}
func TestStructToValueArrayWithCandlePtr(t *testing.T) {
now := time.Now()
candle := &Candle{
Timestamp: Ptr(now),
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
}
values := structToValueArray(candle)
assert.NotNil(t, values, "Values should not be nil")
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
values[0],
*(values[1].(*string)),
*values[2].(*float64),
*values[3].(*float64),
*values[4].(*float64),
*values[5].(*float64),
*values[6].(*float64),
)
}

View File

@@ -14,3 +14,19 @@
- 动态映射dynamic mapping
- 显式映射explicit mapping
- 严格映射strict mappings
## Docker部署
```bash
docker pull bitnami/elasticsearch:latest
docker run -itd \
--name elasticsearch \
-p 9200:9200 \
-p 9300:9300 \
-e ELASTICSEARCH_USERNAME=elastic \
-e ELASTICSEARCH_PASSWORD=elastic \
-e ELASTICSEARCH_NODE_NAME=elasticsearch-node-1 \
-e ELASTICSEARCH_CLUSTER_NAME=elasticsearch-cluster \
bitnami/elasticsearch:latest
```

View File

@@ -26,6 +26,7 @@ func NewLogger(cfg *conf.Logger) log.Logger {
fallthrough
case "text":
loggerFormatter = &logrus.TextFormatter{
ForceColors: cfg.Logrus.ForceColors,
DisableColors: cfg.Logrus.DisableColors,
DisableTimestamp: cfg.Logrus.DisableTimestamp,
TimestampFormat: cfg.Logrus.TimestampFormat,

View File

@@ -12,7 +12,7 @@ git tag database/ent/v0.0.10 --force
git tag database/gorm/v0.0.10 --force
git tag database/mongodb/v0.0.12 --force
git tag database/influxdb/v0.0.12 --force
git tag database/clickhouse/v0.0.12 --force
git tag database/clickhouse/v0.0.14 --force
git tag database/elasticsearch/v0.0.12 --force
git tag database/cassandra/v0.0.10 --force