feat: database.

This commit is contained in:
Bobo
2025-06-29 11:12:16 +08:00
parent ac6f0d1987
commit 8c017a34e0
4 changed files with 377 additions and 59 deletions

View File

@@ -4,7 +4,11 @@ import (
"context"
"crypto/tls"
"database/sql"
"fmt"
"net/url"
"reflect"
"strings"
"time"
clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2"
driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
@@ -229,7 +233,7 @@ func (c *Client) CheckConnection(ctx context.Context) error {
}
// Query 执行查询并返回结果
func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, query string, args ...interface{}) error {
func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, query string, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
@@ -269,7 +273,7 @@ func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, que
}
// QueryRow 执行查询并返回单行结果
func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...interface{}) error {
func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...any) error {
row := c.conn.QueryRow(ctx, query, args...)
if row == nil {
c.log.Error("query row returned nil")
@@ -285,7 +289,7 @@ func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...i
}
// Select 封装 SELECT 子句
func (c *Client) Select(ctx context.Context, dest any, query string, args ...interface{}) error {
func (c *Client) Select(ctx context.Context, dest any, query string, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
@@ -301,7 +305,7 @@ func (c *Client) Select(ctx context.Context, dest any, query string, args ...int
}
// Exec 执行非查询语句
func (c *Client) Exec(ctx context.Context, query string, args ...interface{}) error {
func (c *Client) Exec(ctx context.Context, query string, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
@@ -315,15 +319,178 @@ func (c *Client) Exec(ctx context.Context, query string, args ...interface{}) er
return nil
}
func (c *Client) prepareInsertData(data any) (string, string, []any, error) {
val := reflect.ValueOf(data)
if val.Kind() != reflect.Ptr || val.IsNil() {
return "", "", nil, fmt.Errorf("data must be a non-nil pointer")
}
val = val.Elem()
typ := val.Type()
var columns []string
var placeholders []string
var values []any
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
value := val.Field(i).Interface()
// 优先获取 `cn` 标签,其次获取 `json` 标签,最后使用字段名
columnName := field.Tag.Get("cn")
if columnName == "" {
columnName = field.Tag.Get("json")
}
if columnName == "" {
columnName = field.Name
}
columns = append(columns, columnName)
placeholders = append(placeholders, "?")
switch v := value.(type) {
case *sql.NullString:
if v.Valid {
values = append(values, v.String)
} else {
values = append(values, nil)
}
case *sql.NullInt64:
if v.Valid {
values = append(values, v.Int64)
} else {
values = append(values, nil)
}
case *sql.NullFloat64:
if v.Valid {
values = append(values, v.Float64)
} else {
values = append(values, nil)
}
case *sql.NullBool:
if v.Valid {
values = append(values, v.Bool)
} else {
values = append(values, nil)
}
case *sql.NullTime:
if v != nil && v.Valid {
values = append(values, v.Time.Format("2006-01-02 15:04:05.000000000"))
} else {
values = append(values, nil)
}
case *time.Time:
if v != nil {
values = append(values, v.Format("2006-01-02 15:04:05.000000000"))
} else {
values = append(values, nil)
}
case time.Time:
// 处理 time.Time 类型
if !v.IsZero() {
values = append(values, v.Format("2006-01-02 15:04:05.000000000"))
} else {
values = append(values, nil) // 如果时间为零值,插入 NULL
}
default:
values = append(values, v)
}
}
return strings.Join(columns, ", "), strings.Join(placeholders, ", "), values, nil
}
// Insert 插入数据到指定表
func (c *Client) Insert(ctx context.Context, tableName string, data any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
columns, placeholders, values, err := c.prepareInsertData(data)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
tableName,
columns,
placeholders,
)
// 执行插入操作
if err := c.conn.Exec(ctx, query, values...); err != nil {
c.log.Errorf("insert failed: %v", err)
return ErrInsertFailed
}
return nil
}
func (c *Client) InsertMany(ctx context.Context, tableName string, data []any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if len(data) == 0 {
c.log.Error("data slice is empty")
return ErrInvalidColumnData
}
var columns string
var placeholders []string
var values []any
for _, item := range data {
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
if err != nil {
c.log.Errorf("prepare insert data failed: %v", err)
return ErrPrepareInsertDataFailed
}
if columns == "" {
columns = itemColumns
} else if columns != itemColumns {
c.log.Error("data items have inconsistent columns")
return ErrInvalidColumnData
}
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
values = append(values, itemValues...)
}
// 构造 SQL 语句
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s",
tableName,
columns,
strings.Join(placeholders, ", "),
)
// 执行插入操作
if err := c.conn.Exec(ctx, query, values...); err != nil {
c.log.Errorf("insert many failed: %v", err)
return ErrInsertFailed
}
return nil
}
// AsyncInsert 异步插入数据
func (c *Client) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
func (c *Client) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
if err := c.conn.AsyncInsert(ctx, query, wait, args...); err != nil {
c.log.Errorf("exec failed: %v", err)
c.log.Errorf("async insert failed: %v", err)
return ErrAsyncInsertFailed
}
@@ -331,7 +498,7 @@ func (c *Client) AsyncInsert(ctx context.Context, query string, wait bool, args
}
// BatchInsert 批量插入数据
func (c *Client) BatchInsert(ctx context.Context, query string, data [][]interface{}) error {
func (c *Client) BatchInsert(ctx context.Context, query string, data [][]any) error {
batch, err := c.conn.PrepareBatch(ctx, query)
if err != nil {
c.log.Errorf("failed to prepare batch: %v", err)
@@ -339,8 +506,8 @@ func (c *Client) BatchInsert(ctx context.Context, query string, data [][]interfa
}
for _, row := range data {
if err := batch.Append(row...); err != nil {
c.log.Errorf("failed to append data: %v", err)
if err = batch.Append(row...); err != nil {
c.log.Errorf("failed to append batch data: %v", err)
return ErrBatchAppendFailed
}
}
@@ -352,3 +519,34 @@ func (c *Client) BatchInsert(ctx context.Context, query string, data [][]interfa
return nil
}
// BatchInsertStructs 批量插入结构体数据
func (c *Client) BatchInsertStructs(ctx context.Context, query string, data []any) error {
if c.conn == nil {
c.log.Error("clickhouse client is not initialized")
return ErrClientNotInitialized
}
// 准备批量插入
batch, err := c.conn.PrepareBatch(ctx, query)
if err != nil {
c.log.Errorf("failed to prepare batch: %v", err)
return ErrBatchPrepareFailed
}
// 遍历数据并添加到批量插入
for _, row := range data {
if err := batch.AppendStruct(row); err != nil {
c.log.Errorf("failed to append batch struct data: %v", err)
return ErrBatchAppendFailed
}
}
// 发送批量插入
if err = batch.Send(); err != nil {
c.log.Errorf("failed to send batch: %v", err)
return ErrBatchSendFailed
}
return nil
}

View File

@@ -10,29 +10,30 @@ import (
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
)
func Ptr[T any](v T) *T {
return &v
}
type Candle struct {
Symbol string `json:"symbol" ch:"symbol"`
Open float64 `json:"open" ch:"open"`
High float64 `json:"high" ch:"high"`
Low float64 `json:"low" ch:"low"`
Close float64 `json:"close" ch:"close"`
Volume float64 `json:"volume" ch:"volume"`
Timestamp time.Time `json:"timestamp" ch:"timestamp"`
Timestamp *time.Time `json:"timestamp" ch:"timestamp"`
Symbol *string `json:"symbol" ch:"symbol"`
Open *float64 `json:"open" ch:"open"`
High *float64 `json:"high" ch:"high"`
Low *float64 `json:"low" ch:"low"`
Close *float64 `json:"close" ch:"close"`
Volume *float64 `json:"volume" ch:"volume"`
}
func createTestClient() *Client {
database := "finances"
username := "default"
password := "*Abcd123456"
cli, _ := NewClient(
log.DefaultLogger,
&conf.Bootstrap{
Data: &conf.Data{
Clickhouse: &conf.Data_ClickHouse{
Addresses: []string{"localhost:9000"},
Database: &database,
Username: &username,
Password: &password,
Database: Ptr("finances"),
Username: Ptr("default"),
Password: Ptr("*Abcd123456"),
},
},
},
@@ -44,7 +45,7 @@ func createCandlesTable(client *Client) {
// 创建表的 SQL 语句
createTableQuery := `
CREATE TABLE IF NOT EXISTS candles (
timestamp DateTime,
timestamp DateTime64(3),
symbol String,
open Float64,
high Float64,
@@ -76,32 +77,128 @@ func TestNewClient(t *testing.T) {
createCandlesTable(client)
}
func TestAsyncInsert(t *testing.T) {
func TestInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
// 测试异步插入
err := client.AsyncInsert(context.Background(), "INSERT INTO test_table (id, name) VALUES (?, ?)", true, 1, "example")
assert.NoError(t, err, "AsyncInsert 应该成功执行")
createCandlesTable(client)
// 测试数据
now := time.Now()
candle := &Candle{
Timestamp: &now,
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
}
// 插入数据
err := client.Insert(context.Background(), "candles", candle)
assert.NoError(t, err, "InsertCandlesTable 应该成功执行")
}
func TestBatchInsert(t *testing.T) {
func TestInsertManyCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
now := time.Now()
data := []any{
&Candle{
Timestamp: &now,
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: &now,
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
}
// 插入数据
err := client.InsertMany(context.Background(), "candles", data)
assert.NoError(t, err, "InsertManyCandlesTable 应该成功执行")
}
func TestBatchInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 插入数据的 SQL 语句
insertQuery := `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
// 测试数据
data := [][]interface{}{
{1, "example1"},
{2, "example2"},
{3, "example3"},
{"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0},
{"2023-10-01 12:01:00", "GOOG", 200.5, 205.0, 199.5, 202.0, 2500.0},
{"2023-10-01 12:02:00", "MSFT", 300.5, 305.0, 299.5, 302.0, 3500.0},
}
// 测试批量插入
err := client.BatchInsert(context.Background(), "INSERT INTO test_table (id, name) VALUES (?, ?)", data)
assert.NoError(t, err, "BatchInsert 应该成功执行")
// 批量插入数据
err := client.BatchInsert(context.Background(), insertQuery, data)
assert.NoError(t, err, "BatchInsertCandlesTable 应该成功执行")
}
func TestInsertIntoCandlesTable(t *testing.T) {
func TestBatchInsertStructsCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 插入数据的 SQL 语句
insertQuery := `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
// 测试数据
now := time.Now()
data := []any{
&Candle{
Timestamp: &now,
Symbol: Ptr("AAPL"),
Open: Ptr(100.5),
High: Ptr(105.0),
Low: Ptr(99.5),
Close: Ptr(102.0),
Volume: Ptr(1500.0),
},
&Candle{
Timestamp: &now,
Symbol: Ptr("GOOG"),
Open: Ptr(200.5),
High: Ptr(205.0),
Low: Ptr(199.5),
Close: Ptr(202.0),
Volume: Ptr(2500.0),
},
}
// 批量插入数据
err := client.BatchInsertStructs(context.Background(), insertQuery, data)
assert.NoError(t, err, "BatchInsertStructsCandlesTable 应该成功执行")
}
func TestAsyncInsertIntoCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
@@ -138,6 +235,22 @@ func TestQueryCandlesTable(t *testing.T) {
err := client.Query(context.Background(), func() interface{} { return &Candle{} }, &results, query)
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
for _, result := range results {
candle, ok := result.(*Candle)
assert.True(t, ok, "结果应该是 Candle 类型")
assert.NotNil(t, candle.Timestamp, "Timestamp 列不应该为 nil")
assert.NotNil(t, candle.Symbol, "Symbol 列不应该为 nil")
assert.NotNil(t, candle.Open, "Open 列不应该为 nil")
assert.NotNil(t, candle.High, "High 列不应该为 nil")
assert.NotNil(t, candle.Low, "Low 列不应该为 nil")
assert.NotNil(t, candle.Close, "Close 列不应该为 nil")
assert.NotNil(t, candle.Volume, "Volume 列不应该为 nil")
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
candle.Timestamp.String(),
*candle.Symbol,
*candle.Open, *candle.High, *candle.Low, *candle.Close, *candle.Volume,
)
}
}
func TestSelectCandlesTable(t *testing.T) {
@@ -159,6 +272,21 @@ func TestSelectCandlesTable(t *testing.T) {
err := client.Select(context.Background(), &results, query)
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
for _, result := range results {
assert.NotNil(t, result.Timestamp, "Timestamp 列不应该为 nil")
assert.NotNil(t, result.Symbol, "Symbol 列不应该为 nil")
assert.NotNil(t, result.Open, "Open 列不应该为 nil")
assert.NotNil(t, result.High, "High 列不应该为 nil")
assert.NotNil(t, result.Low, "Low 列不应该为 nil")
assert.NotNil(t, result.Close, "Close 列不应该为 nil")
assert.NotNil(t, result.Volume, "Volume 列不应该为 nil")
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
result.Timestamp.String(),
*result.Symbol,
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
)
}
}
func TestQueryRow(t *testing.T) {
@@ -182,13 +310,19 @@ func TestQueryRow(t *testing.T) {
FROM candles
WHERE symbol = ?
`
var result Candle
err = client.QueryRow(context.Background(), &result, query, "AAPL")
assert.NoError(t, err, "QueryRow 应该成功执行")
assert.Equal(t, "AAPL", result.Symbol, "symbol 列值应该为 AAPL")
assert.Equal(t, 100.5, result.Open, "open 列值应该为 100.5")
assert.Equal(t, 1500.0, result.Volume, "volume 列值应该为 1500.0")
assert.Equal(t, "AAPL", *result.Symbol, "symbol 列值应该为 AAPL")
assert.Equal(t, 100.5, *result.Open, "open 列值应该为 100.5")
assert.Equal(t, 1500.0, *result.Volume, "volume 列值应该为 1500.0")
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
result.Timestamp.String(),
*result.Symbol,
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
)
}
func TestDropCandlesTable(t *testing.T) {
@@ -234,23 +368,3 @@ func TestAggregateCandlesTable(t *testing.T) {
assert.NoError(t, err, "AggregateCandlesTable 应该成功执行")
assert.NotEmpty(t, results, "AggregateCandlesTable 应该返回结果")
}
func TestBatchInsertCandlesTable(t *testing.T) {
client := createTestClient()
assert.NotNil(t, client)
createCandlesTable(client)
// 测试数据
data := [][]interface{}{
{"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0},
{"2023-10-01 12:01:00", "GOOG", 200.5, 205.0, 199.5, 202.0, 2500.0},
{"2023-10-01 12:02:00", "MSFT", 300.5, 305.0, 299.5, 302.0, 3500.0},
}
// 批量插入数据
err := client.BatchInsert(context.Background(), `
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)`, data)
assert.NoError(t, err, "BatchInsertCandlesTable 应该成功执行")
}

View File

@@ -77,4 +77,10 @@ var (
// ErrInvalidProxyURL is returned when the proxy URL is invalid.
ErrInvalidProxyURL = errors.InternalServer("INVALID_PROXY_URL", "invalid proxy URL")
// ErrPrepareInsertDataFailed is returned when preparing insert data fails.
ErrPrepareInsertDataFailed = errors.InternalServer("PREPARE_INSERT_DATA_FAILED", "failed to prepare insert data")
// ErrInvalidColumnData is returned when the column data type is invalid.
ErrInvalidColumnData = errors.InternalServer("INVALID_COLUMN_DATA", "invalid column data type")
)

View File

@@ -12,7 +12,7 @@ git tag database/ent/v0.0.10 --force
git tag database/gorm/v0.0.10 --force
git tag database/mongodb/v0.0.12 --force
git tag database/influxdb/v0.0.12 --force
git tag database/clickhouse/v0.0.12 --force
git tag database/clickhouse/v0.0.13 --force
git tag database/elasticsearch/v0.0.12 --force
git tag database/cassandra/v0.0.10 --force