Compare commits
7 Commits
database/m
...
logger/v0.
| Author | SHA1 | Date | |
|---|---|---|---|
| c2726db8a1 | |||
|
|
47c72651db | ||
|
|
f267c19c73 | ||
|
|
8c017a34e0 | ||
|
|
ac6f0d1987 | ||
|
|
29a8782662 | ||
|
|
d0e55cf372 |
@@ -604,30 +604,30 @@ func (x *Data_MongoDB) GetEnableMetrics() bool {
|
||||
|
||||
// ClickHouse
|
||||
type Data_ClickHouse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Addresses []string `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses,omitempty"` // 对端网络地址
|
||||
Database string `protobuf:"bytes,2,opt,name=database,proto3" json:"database,omitempty"` // 数据库名
|
||||
Username string `protobuf:"bytes,3,opt,name=username,proto3" json:"username,omitempty"` // 用户名
|
||||
Password string `protobuf:"bytes,4,opt,name=password,proto3" json:"password,omitempty"` // 密码
|
||||
Debug bool `protobuf:"varint,5,opt,name=debug,proto3" json:"debug,omitempty"` // 调试开关
|
||||
Protocol string `protobuf:"bytes,6,opt,name=protocol,proto3" json:"protocol,omitempty"` // 协议:http、https、tcp、native
|
||||
Tls *TLS `protobuf:"bytes,7,opt,name=tls,proto3" json:"tls,omitempty"` // TLS配置
|
||||
CompressionMethod string `protobuf:"bytes,10,opt,name=compression_method,json=compressionMethod,proto3" json:"compression_method,omitempty"` // 压缩方法:lz4、zstd、none
|
||||
ConnOpenStrategy string `protobuf:"bytes,11,opt,name=conn_open_strategy,json=connOpenStrategy,proto3" json:"conn_open_strategy,omitempty"` // 连接打开策略:default、lazy、always
|
||||
DialTimeout *durationpb.Duration `protobuf:"bytes,20,opt,name=dial_timeout,json=dialTimeout,proto3" json:"dial_timeout,omitempty"`
|
||||
ConnMaxLifeTime *durationpb.Duration `protobuf:"bytes,21,opt,name=conn_max_life_time,json=connMaxLifeTime,proto3" json:"conn_max_life_time,omitempty"`
|
||||
ConnectionMaxLifetime *durationpb.Duration `protobuf:"bytes,22,opt,name=connection_max_lifetime,json=connectionMaxLifetime,proto3" json:"connection_max_lifetime,omitempty"` // 连接可重用的最大时间长度
|
||||
MaxExecutionTime int32 `protobuf:"varint,23,opt,name=max_execution_time,json=maxExecutionTime,proto3" json:"max_execution_time,omitempty"` // 最大执行时间(秒)
|
||||
MaxOpenConns int32 `protobuf:"varint,30,opt,name=max_open_conns,json=maxOpenConns,proto3" json:"max_open_conns,omitempty"` // 连接池最大打开连接数
|
||||
MaxIdleConns int32 `protobuf:"varint,31,opt,name=max_idle_conns,json=maxIdleConns,proto3" json:"max_idle_conns,omitempty"` // 连接池最大空闲连接数
|
||||
MaxIdleConnections int32 `protobuf:"varint,32,opt,name=max_idle_connections,json=maxIdleConnections,proto3" json:"max_idle_connections,omitempty"` // 连接池最大空闲连接数
|
||||
MaxOpenConnections int32 `protobuf:"varint,33,opt,name=max_open_connections,json=maxOpenConnections,proto3" json:"max_open_connections,omitempty"` // 连接池最大打开连接数
|
||||
BlockBufferSize int32 `protobuf:"varint,40,opt,name=block_buffer_size,json=blockBufferSize,proto3" json:"block_buffer_size,omitempty"` // 数据块缓冲区大小
|
||||
MaxCompressionBuffer int32 `protobuf:"varint,41,opt,name=max_compression_buffer,json=maxCompressionBuffer,proto3" json:"max_compression_buffer,omitempty"` // 最大压缩缓冲区大小
|
||||
EnableTracing bool `protobuf:"varint,100,opt,name=enable_tracing,json=enableTracing,proto3" json:"enable_tracing,omitempty"` // 打开链路追踪
|
||||
EnableMetrics bool `protobuf:"varint,101,opt,name=enable_metrics,json=enableMetrics,proto3" json:"enable_metrics,omitempty"` // 打开性能度量
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Addresses []string `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses,omitempty"` // 对端网络地址
|
||||
Database *string `protobuf:"bytes,2,opt,name=database,proto3,oneof" json:"database,omitempty"` // 数据库名
|
||||
Username *string `protobuf:"bytes,3,opt,name=username,proto3,oneof" json:"username,omitempty"` // 用户名
|
||||
Password *string `protobuf:"bytes,4,opt,name=password,proto3,oneof" json:"password,omitempty"` // 密码
|
||||
Debug *bool `protobuf:"varint,5,opt,name=debug,proto3,oneof" json:"debug,omitempty"` // 调试开关
|
||||
Scheme *string `protobuf:"bytes,6,opt,name=scheme,proto3,oneof" json:"scheme,omitempty"` // 协议:http、https、native
|
||||
Tls *TLS `protobuf:"bytes,7,opt,name=tls,proto3,oneof" json:"tls,omitempty"` // TLS配置
|
||||
BlockBufferSize *int32 `protobuf:"varint,8,opt,name=block_buffer_size,json=blockBufferSize,proto3,oneof" json:"block_buffer_size,omitempty"` // 数据块缓冲区大小
|
||||
CompressionMethod *string `protobuf:"bytes,10,opt,name=compression_method,json=compressionMethod,proto3,oneof" json:"compression_method,omitempty"` // 压缩方法:zstd、lz4、lz4hc、gzip、deflate、br、none
|
||||
CompressionLevel *int32 `protobuf:"varint,11,opt,name=compression_level,json=compressionLevel,proto3,oneof" json:"compression_level,omitempty"` // 压缩级别:0-9
|
||||
MaxCompressionBuffer *int32 `protobuf:"varint,12,opt,name=max_compression_buffer,json=maxCompressionBuffer,proto3,oneof" json:"max_compression_buffer,omitempty"` // 最大压缩缓冲区大小
|
||||
ConnectionOpenStrategy *string `protobuf:"bytes,20,opt,name=connection_open_strategy,json=connectionOpenStrategy,proto3,oneof" json:"connection_open_strategy,omitempty"` // 连接打开策略:in_order、round_robin、random
|
||||
DialTimeout *durationpb.Duration `protobuf:"bytes,30,opt,name=dial_timeout,json=dialTimeout,proto3,oneof" json:"dial_timeout,omitempty"` // 连接超时时间
|
||||
ReadTimeout *durationpb.Duration `protobuf:"bytes,31,opt,name=read_timeout,json=readTimeout,proto3,oneof" json:"read_timeout,omitempty"` // 读取超时时间
|
||||
ConnMaxLifetime *durationpb.Duration `protobuf:"bytes,32,opt,name=conn_max_lifetime,json=connMaxLifetime,proto3,oneof" json:"conn_max_lifetime,omitempty"` // 连接可重用的最大时间长度
|
||||
MaxIdleConns *int32 `protobuf:"varint,40,opt,name=max_idle_conns,json=maxIdleConns,proto3,oneof" json:"max_idle_conns,omitempty"` // 连接池最大空闲连接数
|
||||
MaxOpenConns *int32 `protobuf:"varint,41,opt,name=max_open_conns,json=maxOpenConns,proto3,oneof" json:"max_open_conns,omitempty"` // 连接池最大打开连接数
|
||||
Dsn *string `protobuf:"bytes,50,opt,name=dsn,proto3,oneof" json:"dsn,omitempty"` // 数据源名称(DSN字符串)
|
||||
HttpProxy *string `protobuf:"bytes,60,opt,name=http_proxy,json=httpProxy,proto3,oneof" json:"http_proxy,omitempty"` // HTTP代理地址
|
||||
EnableTracing *bool `protobuf:"varint,100,opt,name=enable_tracing,json=enableTracing,proto3,oneof" json:"enable_tracing,omitempty"` // 打开链路追踪
|
||||
EnableMetrics *bool `protobuf:"varint,101,opt,name=enable_metrics,json=enableMetrics,proto3,oneof" json:"enable_metrics,omitempty"` // 打开性能度量
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) Reset() {
|
||||
@@ -668,36 +668,36 @@ func (x *Data_ClickHouse) GetAddresses() []string {
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetDatabase() string {
|
||||
if x != nil {
|
||||
return x.Database
|
||||
if x != nil && x.Database != nil {
|
||||
return *x.Database
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetUsername() string {
|
||||
if x != nil {
|
||||
return x.Username
|
||||
if x != nil && x.Username != nil {
|
||||
return *x.Username
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetPassword() string {
|
||||
if x != nil {
|
||||
return x.Password
|
||||
if x != nil && x.Password != nil {
|
||||
return *x.Password
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetDebug() bool {
|
||||
if x != nil {
|
||||
return x.Debug
|
||||
if x != nil && x.Debug != nil {
|
||||
return *x.Debug
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetProtocol() string {
|
||||
if x != nil {
|
||||
return x.Protocol
|
||||
func (x *Data_ClickHouse) GetScheme() string {
|
||||
if x != nil && x.Scheme != nil {
|
||||
return *x.Scheme
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -709,16 +709,37 @@ func (x *Data_ClickHouse) GetTls() *TLS {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetBlockBufferSize() int32 {
|
||||
if x != nil && x.BlockBufferSize != nil {
|
||||
return *x.BlockBufferSize
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetCompressionMethod() string {
|
||||
if x != nil {
|
||||
return x.CompressionMethod
|
||||
if x != nil && x.CompressionMethod != nil {
|
||||
return *x.CompressionMethod
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetConnOpenStrategy() string {
|
||||
if x != nil {
|
||||
return x.ConnOpenStrategy
|
||||
func (x *Data_ClickHouse) GetCompressionLevel() int32 {
|
||||
if x != nil && x.CompressionLevel != nil {
|
||||
return *x.CompressionLevel
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetMaxCompressionBuffer() int32 {
|
||||
if x != nil && x.MaxCompressionBuffer != nil {
|
||||
return *x.MaxCompressionBuffer
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetConnectionOpenStrategy() string {
|
||||
if x != nil && x.ConnectionOpenStrategy != nil {
|
||||
return *x.ConnectionOpenStrategy
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -730,79 +751,58 @@ func (x *Data_ClickHouse) GetDialTimeout() *durationpb.Duration {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetConnMaxLifeTime() *durationpb.Duration {
|
||||
func (x *Data_ClickHouse) GetReadTimeout() *durationpb.Duration {
|
||||
if x != nil {
|
||||
return x.ConnMaxLifeTime
|
||||
return x.ReadTimeout
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetConnectionMaxLifetime() *durationpb.Duration {
|
||||
func (x *Data_ClickHouse) GetConnMaxLifetime() *durationpb.Duration {
|
||||
if x != nil {
|
||||
return x.ConnectionMaxLifetime
|
||||
return x.ConnMaxLifetime
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetMaxExecutionTime() int32 {
|
||||
if x != nil {
|
||||
return x.MaxExecutionTime
|
||||
func (x *Data_ClickHouse) GetMaxIdleConns() int32 {
|
||||
if x != nil && x.MaxIdleConns != nil {
|
||||
return *x.MaxIdleConns
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetMaxOpenConns() int32 {
|
||||
if x != nil {
|
||||
return x.MaxOpenConns
|
||||
if x != nil && x.MaxOpenConns != nil {
|
||||
return *x.MaxOpenConns
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetMaxIdleConns() int32 {
|
||||
if x != nil {
|
||||
return x.MaxIdleConns
|
||||
func (x *Data_ClickHouse) GetDsn() string {
|
||||
if x != nil && x.Dsn != nil {
|
||||
return *x.Dsn
|
||||
}
|
||||
return 0
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetMaxIdleConnections() int32 {
|
||||
if x != nil {
|
||||
return x.MaxIdleConnections
|
||||
func (x *Data_ClickHouse) GetHttpProxy() string {
|
||||
if x != nil && x.HttpProxy != nil {
|
||||
return *x.HttpProxy
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetMaxOpenConnections() int32 {
|
||||
if x != nil {
|
||||
return x.MaxOpenConnections
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetBlockBufferSize() int32 {
|
||||
if x != nil {
|
||||
return x.BlockBufferSize
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetMaxCompressionBuffer() int32 {
|
||||
if x != nil {
|
||||
return x.MaxCompressionBuffer
|
||||
}
|
||||
return 0
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetEnableTracing() bool {
|
||||
if x != nil {
|
||||
return x.EnableTracing
|
||||
if x != nil && x.EnableTracing != nil {
|
||||
return *x.EnableTracing
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *Data_ClickHouse) GetEnableMetrics() bool {
|
||||
if x != nil {
|
||||
return x.EnableMetrics
|
||||
if x != nil && x.EnableMetrics != nil {
|
||||
return *x.EnableMetrics
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -1843,7 +1843,7 @@ var File_conf_v1_kratos_conf_data_proto protoreflect.FileDescriptor
|
||||
|
||||
const file_conf_v1_kratos_conf_data_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"\x1econf/v1/kratos_conf_data.proto\x12\x04conf\x1a\x1egoogle/protobuf/duration.proto\x1a\x1dconf/v1/kratos_conf_tls.proto\"\xc75\n" +
|
||||
"\x1econf/v1/kratos_conf_data.proto\x12\x04conf\x1a\x1egoogle/protobuf/duration.proto\x1a\x1dconf/v1/kratos_conf_tls.proto\"\xc38\n" +
|
||||
"\x04Data\x124\n" +
|
||||
"\bdatabase\x18\x01 \x01(\v2\x13.conf.Data.DatabaseH\x00R\bdatabase\x88\x01\x01\x12+\n" +
|
||||
"\x05redis\x18\n" +
|
||||
@@ -1922,31 +1922,54 @@ const file_conf_v1_kratos_conf_data_proto_rawDesc = "" +
|
||||
"\t_usernameB\v\n" +
|
||||
"\t_passwordB\x11\n" +
|
||||
"\x0f_auth_mechanismB\x0e\n" +
|
||||
"\f_auth_source\x1a\x91\a\n" +
|
||||
"\f_auth_source\x1a\x8d\n" +
|
||||
"\n" +
|
||||
"\n" +
|
||||
"ClickHouse\x12\x1c\n" +
|
||||
"\taddresses\x18\x01 \x03(\tR\taddresses\x12\x1a\n" +
|
||||
"\bdatabase\x18\x02 \x01(\tR\bdatabase\x12\x1a\n" +
|
||||
"\busername\x18\x03 \x01(\tR\busername\x12\x1a\n" +
|
||||
"\bpassword\x18\x04 \x01(\tR\bpassword\x12\x14\n" +
|
||||
"\x05debug\x18\x05 \x01(\bR\x05debug\x12\x1a\n" +
|
||||
"\bprotocol\x18\x06 \x01(\tR\bprotocol\x12\x1b\n" +
|
||||
"\x03tls\x18\a \x01(\v2\t.conf.TLSR\x03tls\x12-\n" +
|
||||
"\taddresses\x18\x01 \x03(\tR\taddresses\x12\x1f\n" +
|
||||
"\bdatabase\x18\x02 \x01(\tH\x00R\bdatabase\x88\x01\x01\x12\x1f\n" +
|
||||
"\busername\x18\x03 \x01(\tH\x01R\busername\x88\x01\x01\x12\x1f\n" +
|
||||
"\bpassword\x18\x04 \x01(\tH\x02R\bpassword\x88\x01\x01\x12\x19\n" +
|
||||
"\x05debug\x18\x05 \x01(\bH\x03R\x05debug\x88\x01\x01\x12\x1b\n" +
|
||||
"\x06scheme\x18\x06 \x01(\tH\x04R\x06scheme\x88\x01\x01\x12 \n" +
|
||||
"\x03tls\x18\a \x01(\v2\t.conf.TLSH\x05R\x03tls\x88\x01\x01\x12/\n" +
|
||||
"\x11block_buffer_size\x18\b \x01(\x05H\x06R\x0fblockBufferSize\x88\x01\x01\x122\n" +
|
||||
"\x12compression_method\x18\n" +
|
||||
" \x01(\tR\x11compressionMethod\x12,\n" +
|
||||
"\x12conn_open_strategy\x18\v \x01(\tR\x10connOpenStrategy\x12<\n" +
|
||||
"\fdial_timeout\x18\x14 \x01(\v2\x19.google.protobuf.DurationR\vdialTimeout\x12F\n" +
|
||||
"\x12conn_max_life_time\x18\x15 \x01(\v2\x19.google.protobuf.DurationR\x0fconnMaxLifeTime\x12Q\n" +
|
||||
"\x17connection_max_lifetime\x18\x16 \x01(\v2\x19.google.protobuf.DurationR\x15connectionMaxLifetime\x12,\n" +
|
||||
"\x12max_execution_time\x18\x17 \x01(\x05R\x10maxExecutionTime\x12$\n" +
|
||||
"\x0emax_open_conns\x18\x1e \x01(\x05R\fmaxOpenConns\x12$\n" +
|
||||
"\x0emax_idle_conns\x18\x1f \x01(\x05R\fmaxIdleConns\x120\n" +
|
||||
"\x14max_idle_connections\x18 \x01(\x05R\x12maxIdleConnections\x120\n" +
|
||||
"\x14max_open_connections\x18! \x01(\x05R\x12maxOpenConnections\x12*\n" +
|
||||
"\x11block_buffer_size\x18( \x01(\x05R\x0fblockBufferSize\x124\n" +
|
||||
"\x16max_compression_buffer\x18) \x01(\x05R\x14maxCompressionBuffer\x12%\n" +
|
||||
"\x0eenable_tracing\x18d \x01(\bR\renableTracing\x12%\n" +
|
||||
"\x0eenable_metrics\x18e \x01(\bR\renableMetrics\x1a\xe5\x02\n" +
|
||||
" \x01(\tH\aR\x11compressionMethod\x88\x01\x01\x120\n" +
|
||||
"\x11compression_level\x18\v \x01(\x05H\bR\x10compressionLevel\x88\x01\x01\x129\n" +
|
||||
"\x16max_compression_buffer\x18\f \x01(\x05H\tR\x14maxCompressionBuffer\x88\x01\x01\x12=\n" +
|
||||
"\x18connection_open_strategy\x18\x14 \x01(\tH\n" +
|
||||
"R\x16connectionOpenStrategy\x88\x01\x01\x12A\n" +
|
||||
"\fdial_timeout\x18\x1e \x01(\v2\x19.google.protobuf.DurationH\vR\vdialTimeout\x88\x01\x01\x12A\n" +
|
||||
"\fread_timeout\x18\x1f \x01(\v2\x19.google.protobuf.DurationH\fR\vreadTimeout\x88\x01\x01\x12J\n" +
|
||||
"\x11conn_max_lifetime\x18 \x01(\v2\x19.google.protobuf.DurationH\rR\x0fconnMaxLifetime\x88\x01\x01\x12)\n" +
|
||||
"\x0emax_idle_conns\x18( \x01(\x05H\x0eR\fmaxIdleConns\x88\x01\x01\x12)\n" +
|
||||
"\x0emax_open_conns\x18) \x01(\x05H\x0fR\fmaxOpenConns\x88\x01\x01\x12\x15\n" +
|
||||
"\x03dsn\x182 \x01(\tH\x10R\x03dsn\x88\x01\x01\x12\"\n" +
|
||||
"\n" +
|
||||
"http_proxy\x18< \x01(\tH\x11R\thttpProxy\x88\x01\x01\x12*\n" +
|
||||
"\x0eenable_tracing\x18d \x01(\bH\x12R\renableTracing\x88\x01\x01\x12*\n" +
|
||||
"\x0eenable_metrics\x18e \x01(\bH\x13R\renableMetrics\x88\x01\x01B\v\n" +
|
||||
"\t_databaseB\v\n" +
|
||||
"\t_usernameB\v\n" +
|
||||
"\t_passwordB\b\n" +
|
||||
"\x06_debugB\t\n" +
|
||||
"\a_schemeB\x06\n" +
|
||||
"\x04_tlsB\x14\n" +
|
||||
"\x12_block_buffer_sizeB\x15\n" +
|
||||
"\x13_compression_methodB\x14\n" +
|
||||
"\x12_compression_levelB\x19\n" +
|
||||
"\x17_max_compression_bufferB\x1b\n" +
|
||||
"\x19_connection_open_strategyB\x0f\n" +
|
||||
"\r_dial_timeoutB\x0f\n" +
|
||||
"\r_read_timeoutB\x14\n" +
|
||||
"\x12_conn_max_lifetimeB\x11\n" +
|
||||
"\x0f_max_idle_connsB\x11\n" +
|
||||
"\x0f_max_open_connsB\x06\n" +
|
||||
"\x04_dsnB\r\n" +
|
||||
"\v_http_proxyB\x11\n" +
|
||||
"\x0f_enable_tracingB\x11\n" +
|
||||
"\x0f_enable_metrics\x1a\xe5\x02\n" +
|
||||
"\bInfluxDB\x12\x12\n" +
|
||||
"\x04host\x18\x01 \x01(\tR\x04host\x12\x14\n" +
|
||||
"\x05token\x18\x02 \x01(\tR\x05token\x12\x1f\n" +
|
||||
@@ -2129,8 +2152,8 @@ var file_conf_v1_kratos_conf_data_proto_depIdxs = []int32{
|
||||
19, // 28: conf.Data.MongoDB.timeout:type_name -> google.protobuf.Duration
|
||||
20, // 29: conf.Data.ClickHouse.tls:type_name -> conf.TLS
|
||||
19, // 30: conf.Data.ClickHouse.dial_timeout:type_name -> google.protobuf.Duration
|
||||
19, // 31: conf.Data.ClickHouse.conn_max_life_time:type_name -> google.protobuf.Duration
|
||||
19, // 32: conf.Data.ClickHouse.connection_max_lifetime:type_name -> google.protobuf.Duration
|
||||
19, // 31: conf.Data.ClickHouse.read_timeout:type_name -> google.protobuf.Duration
|
||||
19, // 32: conf.Data.ClickHouse.conn_max_lifetime:type_name -> google.protobuf.Duration
|
||||
19, // 33: conf.Data.InfluxDB.timeout:type_name -> google.protobuf.Duration
|
||||
19, // 34: conf.Data.InfluxDB.idle_connection_timeout:type_name -> google.protobuf.Duration
|
||||
19, // 35: conf.Data.ElasticSearch.discover_nodes_interval:type_name -> google.protobuf.Duration
|
||||
@@ -2157,6 +2180,7 @@ func file_conf_v1_kratos_conf_data_proto_init() {
|
||||
file_conf_v1_kratos_conf_data_proto_msgTypes[0].OneofWrappers = []any{}
|
||||
file_conf_v1_kratos_conf_data_proto_msgTypes[1].OneofWrappers = []any{}
|
||||
file_conf_v1_kratos_conf_data_proto_msgTypes[3].OneofWrappers = []any{}
|
||||
file_conf_v1_kratos_conf_data_proto_msgTypes[4].OneofWrappers = []any{}
|
||||
type x struct{}
|
||||
out := protoimpl.TypeBuilder{
|
||||
File: protoimpl.DescBuilder{
|
||||
|
||||
@@ -191,6 +191,7 @@ type Logger_Logrus struct {
|
||||
TimestampFormat string `protobuf:"bytes,3,opt,name=timestamp_format,json=timestampFormat,proto3" json:"timestamp_format,omitempty"` // 定义时间戳格式,例如:"2006-01-02 15:04:05"
|
||||
DisableColors bool `protobuf:"varint,4,opt,name=disable_colors,json=disableColors,proto3" json:"disable_colors,omitempty"` // 不需要彩色日志
|
||||
DisableTimestamp bool `protobuf:"varint,5,opt,name=disable_timestamp,json=disableTimestamp,proto3" json:"disable_timestamp,omitempty"` // 不需要时间戳
|
||||
ForceColors bool `protobuf:"varint,6,opt,name=force_colors,json=forceColors,proto3" json:"force_colors,omitempty"` // 是否开启彩色日志
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -260,6 +261,13 @@ func (x *Logger_Logrus) GetDisableTimestamp() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *Logger_Logrus) GetForceColors() bool {
|
||||
if x != nil {
|
||||
return x.ForceColors
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Fluent
|
||||
type Logger_Fluent struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
@@ -447,7 +455,7 @@ var File_conf_v1_kratos_conf_logger_proto protoreflect.FileDescriptor
|
||||
|
||||
const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
" conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xc4\a\n" +
|
||||
" conf/v1/kratos_conf_logger.proto\x12\x04conf\"\xe7\a\n" +
|
||||
"\x06Logger\x12\x12\n" +
|
||||
"\x04type\x18\x01 \x01(\tR\x04type\x12'\n" +
|
||||
"\x03zap\x18\x02 \x01(\v2\x10.conf.Logger.ZapH\x00R\x03zap\x88\x01\x01\x120\n" +
|
||||
@@ -461,13 +469,14 @@ const file_conf_v1_kratos_conf_logger_proto_rawDesc = "" +
|
||||
"\bmax_size\x18\x03 \x01(\x05R\amaxSize\x12\x17\n" +
|
||||
"\amax_age\x18\x04 \x01(\x05R\x06maxAge\x12\x1f\n" +
|
||||
"\vmax_backups\x18\x05 \x01(\x05R\n" +
|
||||
"maxBackups\x1a\xbb\x01\n" +
|
||||
"maxBackups\x1a\xde\x01\n" +
|
||||
"\x06Logrus\x12\x14\n" +
|
||||
"\x05level\x18\x01 \x01(\tR\x05level\x12\x1c\n" +
|
||||
"\tformatter\x18\x02 \x01(\tR\tformatter\x12)\n" +
|
||||
"\x10timestamp_format\x18\x03 \x01(\tR\x0ftimestampFormat\x12%\n" +
|
||||
"\x0edisable_colors\x18\x04 \x01(\bR\rdisableColors\x12+\n" +
|
||||
"\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x1a$\n" +
|
||||
"\x11disable_timestamp\x18\x05 \x01(\bR\x10disableTimestamp\x12!\n" +
|
||||
"\fforce_colors\x18\x06 \x01(\bR\vforceColors\x1a$\n" +
|
||||
"\x06Fluent\x12\x1a\n" +
|
||||
"\bendpoint\x18\x01 \x01(\tR\bendpoint\x1a\x82\x01\n" +
|
||||
"\x06Aliyun\x12\x1a\n" +
|
||||
|
||||
@@ -74,33 +74,36 @@ message Data {
|
||||
message ClickHouse {
|
||||
repeated string addresses = 1; // 对端网络地址
|
||||
|
||||
string database = 2; // 数据库名
|
||||
string username = 3; // 用户名
|
||||
string password = 4; // 密码
|
||||
optional string database = 2; // 数据库名
|
||||
optional string username = 3; // 用户名
|
||||
optional string password = 4; // 密码
|
||||
|
||||
bool debug = 5; // 调试开关
|
||||
string protocol = 6; // 协议:http、https、tcp、native
|
||||
optional bool debug = 5; // 调试开关
|
||||
optional string scheme = 6; // 协议:http、https、native
|
||||
|
||||
TLS tls = 7; // TLS配置
|
||||
optional TLS tls = 7; // TLS配置
|
||||
|
||||
string compression_method = 10; // 压缩方法:lz4、zstd、none
|
||||
string conn_open_strategy = 11; // 连接打开策略:default、lazy、always
|
||||
optional int32 block_buffer_size = 8; // 数据块缓冲区大小
|
||||
|
||||
google.protobuf.Duration dial_timeout = 20;
|
||||
google.protobuf.Duration conn_max_life_time = 21;
|
||||
google.protobuf.Duration connection_max_lifetime = 22; // 连接可重用的最大时间长度
|
||||
int32 max_execution_time = 23; // 最大执行时间(秒)
|
||||
optional string compression_method = 10; // 压缩方法:zstd、lz4、lz4hc、gzip、deflate、br、none
|
||||
optional int32 compression_level = 11; // 压缩级别:0-9
|
||||
optional int32 max_compression_buffer = 12; // 最大压缩缓冲区大小
|
||||
|
||||
int32 max_open_conns = 30; // 连接池最大打开连接数
|
||||
int32 max_idle_conns = 31; // 连接池最大空闲连接数
|
||||
int32 max_idle_connections = 32; // 连接池最大空闲连接数
|
||||
int32 max_open_connections = 33; // 连接池最大打开连接数
|
||||
optional string connection_open_strategy = 20; // 连接打开策略:in_order、round_robin、random
|
||||
|
||||
int32 block_buffer_size = 40; // 数据块缓冲区大小
|
||||
int32 max_compression_buffer = 41; // 最大压缩缓冲区大小
|
||||
optional google.protobuf.Duration dial_timeout = 30; // 连接超时时间
|
||||
optional google.protobuf.Duration read_timeout = 31; // 读取超时时间
|
||||
optional google.protobuf.Duration conn_max_lifetime = 32; // 连接可重用的最大时间长度
|
||||
|
||||
bool enable_tracing = 100; // 打开链路追踪
|
||||
bool enable_metrics = 101; // 打开性能度量
|
||||
optional int32 max_idle_conns = 40; // 连接池最大空闲连接数
|
||||
optional int32 max_open_conns = 41; // 连接池最大打开连接数
|
||||
|
||||
optional string dsn = 50; // 数据源名称(DSN字符串)
|
||||
|
||||
optional string http_proxy = 60; // HTTP代理地址
|
||||
|
||||
optional bool enable_tracing = 100; // 打开链路追踪
|
||||
optional bool enable_metrics = 101; // 打开性能度量
|
||||
}
|
||||
|
||||
// InfluxDB
|
||||
|
||||
@@ -22,6 +22,7 @@ message Logger {
|
||||
string timestamp_format = 3; // 定义时间戳格式,例如:"2006-01-02 15:04:05"
|
||||
bool disable_colors = 4; // 不需要彩色日志
|
||||
bool disable_timestamp = 5; // 不需要时间戳
|
||||
bool force_colors = 6; // 是否开启彩色日志
|
||||
}
|
||||
|
||||
// Fluent
|
||||
|
||||
18
database/clickhouse/README.md
Normal file
18
database/clickhouse/README.md
Normal file
@@ -0,0 +1,18 @@
|
||||
# ClickHouse
|
||||
|
||||
## Docker部署
|
||||
|
||||
```bash
|
||||
docker pull bitnami/clickhouse:latest
|
||||
|
||||
docker run -itd \
|
||||
--name clickhouse-server \
|
||||
--network=app-tier \
|
||||
-p 8123:8123 \
|
||||
-p 9000:9000 \
|
||||
-p 9004:9004 \
|
||||
-e ALLOW_EMPTY_PASSWORD=no \
|
||||
-e CLICKHOUSE_ADMIN_USER=default \
|
||||
-e CLICKHOUSE_ADMIN_PASSWORD=123456 \
|
||||
bitnami/clickhouse:latest
|
||||
```
|
||||
199
database/clickhouse/batch.go
Normal file
199
database/clickhouse/batch.go
Normal file
@@ -0,0 +1,199 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2"
|
||||
driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
)
|
||||
|
||||
// BatchInserter 批量插入器
|
||||
type BatchInserter struct {
|
||||
conn clickhouseV2.Conn
|
||||
tableName string
|
||||
columns []string
|
||||
batchSize int
|
||||
rows []interface{}
|
||||
insertStmt string
|
||||
mu sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewBatchInserter 创建新的批量插入器
|
||||
func NewBatchInserter(
|
||||
ctx context.Context,
|
||||
conn clickhouseV2.Conn,
|
||||
tableName string,
|
||||
batchSize int,
|
||||
columns []string,
|
||||
) (*BatchInserter, error) {
|
||||
if batchSize <= 0 {
|
||||
batchSize = 1000 // 默认批量大小
|
||||
}
|
||||
|
||||
if len(columns) == 0 {
|
||||
return nil, errors.New("必须指定列名")
|
||||
}
|
||||
|
||||
// 构建INSERT语句
|
||||
placeholders := make([]string, len(columns))
|
||||
for i := range placeholders {
|
||||
placeholders[i] = "?"
|
||||
}
|
||||
|
||||
insertStmt := fmt.Sprintf(
|
||||
"INSERT INTO %s (%s) VALUES (%s)",
|
||||
tableName,
|
||||
strings.Join(columns, ", "),
|
||||
strings.Join(placeholders, ", "),
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
return &BatchInserter{
|
||||
conn: conn,
|
||||
tableName: tableName,
|
||||
columns: columns,
|
||||
batchSize: batchSize,
|
||||
rows: make([]interface{}, 0, batchSize),
|
||||
insertStmt: insertStmt,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Add 添加数据行
|
||||
func (bi *BatchInserter) Add(row interface{}) error {
|
||||
bi.mu.Lock()
|
||||
defer bi.mu.Unlock()
|
||||
|
||||
// 检查上下文是否已取消
|
||||
if bi.ctx.Err() != nil {
|
||||
return bi.ctx.Err()
|
||||
}
|
||||
|
||||
bi.rows = append(bi.rows, row)
|
||||
|
||||
// 达到批量大小时自动提交
|
||||
if len(bi.rows) >= bi.batchSize {
|
||||
return bi.flush()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush 强制提交当前批次
|
||||
func (bi *BatchInserter) Flush() error {
|
||||
bi.mu.Lock()
|
||||
defer bi.mu.Unlock()
|
||||
|
||||
return bi.flush()
|
||||
}
|
||||
|
||||
// Close 关闭插入器并提交剩余数据
|
||||
func (bi *BatchInserter) Close() error {
|
||||
defer bi.cancel()
|
||||
|
||||
bi.mu.Lock()
|
||||
defer bi.mu.Unlock()
|
||||
|
||||
return bi.flush()
|
||||
}
|
||||
|
||||
// flush 内部提交方法
|
||||
func (bi *BatchInserter) flush() error {
|
||||
if len(bi.rows) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 创建批量
|
||||
batch, err := bi.conn.PrepareBatch(bi.ctx, bi.insertStmt)
|
||||
if err != nil {
|
||||
return ErrBatchPrepareFailed
|
||||
}
|
||||
|
||||
// 添加所有行
|
||||
for _, row := range bi.rows {
|
||||
// 使用反射获取字段值
|
||||
if err = appendStructToBatch(batch, row, bi.columns); err != nil {
|
||||
return ErrBatchAppendFailed
|
||||
}
|
||||
}
|
||||
|
||||
// 提交批量
|
||||
if err = batch.Send(); err != nil {
|
||||
return ErrBatchSendFailed
|
||||
}
|
||||
|
||||
// 清空批次
|
||||
bi.rows = bi.rows[:0]
|
||||
return nil
|
||||
}
|
||||
|
||||
// appendStructToBatch 使用反射将结构体字段添加到批次
|
||||
func appendStructToBatch(batch driverV2.Batch, obj interface{}, columns []string) error {
|
||||
v := reflect.ValueOf(obj)
|
||||
|
||||
// 如果是指针,获取指针指向的值
|
||||
if v.Kind() == reflect.Ptr {
|
||||
if v.IsNil() {
|
||||
return errors.New("nil指针")
|
||||
}
|
||||
v = v.Elem()
|
||||
}
|
||||
|
||||
// 必须是结构体
|
||||
if v.Kind() != reflect.Struct {
|
||||
return fmt.Errorf("期望结构体类型,得到 %v", v.Kind())
|
||||
}
|
||||
|
||||
// 获取结构体类型
|
||||
t := v.Type()
|
||||
|
||||
// 准备参数值
|
||||
values := make([]interface{}, len(columns))
|
||||
|
||||
// 映射列名到结构体字段
|
||||
for i, col := range columns {
|
||||
// 查找匹配的字段
|
||||
found := false
|
||||
for j := 0; j < v.NumField(); j++ {
|
||||
field := t.Field(j)
|
||||
|
||||
// 检查ch标签
|
||||
if tag := field.Tag.Get("ch"); strings.TrimSpace(tag) == col {
|
||||
values[i] = v.Field(j).Interface()
|
||||
found = true
|
||||
break
|
||||
}
|
||||
|
||||
// 检查json标签
|
||||
jsonTags := strings.Split(field.Tag.Get("json"), ",")
|
||||
if len(jsonTags) > 0 && strings.TrimSpace(jsonTags[0]) == col {
|
||||
values[i] = v.Field(j).Interface()
|
||||
found = true
|
||||
break
|
||||
}
|
||||
|
||||
// 检查字段名
|
||||
if field.Name == col {
|
||||
values[i] = v.Field(j).Interface()
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
return fmt.Errorf("未找到列 %s 对应的结构体字段", col)
|
||||
}
|
||||
}
|
||||
|
||||
// 添加到批次
|
||||
return batch.Append(values...)
|
||||
}
|
||||
@@ -1,55 +1,632 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
clickhouseV2 "github.com/ClickHouse/clickhouse-go/v2"
|
||||
driverV2 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
|
||||
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
|
||||
"github.com/tx7do/kratos-bootstrap/utils"
|
||||
)
|
||||
|
||||
func NewClickHouseClient(cfg *conf.Bootstrap, l *log.Helper) clickhouse.Conn {
|
||||
type Creator func() any
|
||||
|
||||
var compressionMap = map[string]clickhouseV2.CompressionMethod{
|
||||
"none": clickhouseV2.CompressionNone,
|
||||
"zstd": clickhouseV2.CompressionZSTD,
|
||||
"lz4": clickhouseV2.CompressionLZ4,
|
||||
"lz4hc": clickhouseV2.CompressionLZ4HC,
|
||||
"gzip": clickhouseV2.CompressionGZIP,
|
||||
"deflate": clickhouseV2.CompressionDeflate,
|
||||
"br": clickhouseV2.CompressionBrotli,
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
log *log.Helper
|
||||
|
||||
conn clickhouseV2.Conn
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func NewClient(logger log.Logger, cfg *conf.Bootstrap) (*Client, error) {
|
||||
c := &Client{
|
||||
log: log.NewHelper(log.With(logger, "module", "clickhouse-client")),
|
||||
}
|
||||
|
||||
if err := c.createClickHouseClient(cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// createClickHouseClient 创建ClickHouse客户端
|
||||
func (c *Client) createClickHouseClient(cfg *conf.Bootstrap) error {
|
||||
if cfg.Data == nil || cfg.Data.Clickhouse == nil {
|
||||
l.Warn("ClickHouse config is nil")
|
||||
return nil
|
||||
}
|
||||
|
||||
options := &clickhouse.Options{
|
||||
Addr: []string{cfg.Data.Clickhouse.Address},
|
||||
Auth: clickhouse.Auth{
|
||||
Database: cfg.Data.Clickhouse.Database,
|
||||
Username: cfg.Data.Clickhouse.Username,
|
||||
Password: cfg.Data.Clickhouse.Password,
|
||||
},
|
||||
Debug: cfg.Data.Clickhouse.Debug,
|
||||
DialTimeout: cfg.Data.Clickhouse.DialTimeout.AsDuration(),
|
||||
MaxOpenConns: int(cfg.Data.Clickhouse.MaxOpenConns),
|
||||
MaxIdleConns: int(cfg.Data.Clickhouse.MaxIdleConns),
|
||||
ConnMaxLifetime: cfg.Data.Clickhouse.ConnMaxLifeTime.AsDuration(),
|
||||
opts := &clickhouseV2.Options{}
|
||||
|
||||
if cfg.Data.Clickhouse.Dsn != nil {
|
||||
tmp, err := clickhouseV2.ParseDSN(cfg.Data.Clickhouse.GetDsn())
|
||||
if err != nil {
|
||||
c.log.Errorf("failed to parse clickhouse DSN: %v", err)
|
||||
return ErrInvalidDSN
|
||||
}
|
||||
opts = tmp
|
||||
}
|
||||
|
||||
if cfg.Data.Clickhouse.Addresses != nil {
|
||||
opts.Addr = cfg.Data.Clickhouse.GetAddresses()
|
||||
}
|
||||
|
||||
if cfg.Data.Clickhouse.Database != nil ||
|
||||
cfg.Data.Clickhouse.Username != nil ||
|
||||
cfg.Data.Clickhouse.Password != nil {
|
||||
opts.Auth = clickhouseV2.Auth{}
|
||||
|
||||
if cfg.Data.Clickhouse.Database != nil {
|
||||
opts.Auth.Database = cfg.Data.Clickhouse.GetDatabase()
|
||||
}
|
||||
if cfg.Data.Clickhouse.Username != nil {
|
||||
opts.Auth.Username = cfg.Data.Clickhouse.GetUsername()
|
||||
}
|
||||
if cfg.Data.Clickhouse.Password != nil {
|
||||
opts.Auth.Password = cfg.Data.Clickhouse.GetPassword()
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Data.Clickhouse.Debug != nil {
|
||||
opts.Debug = cfg.Data.Clickhouse.GetDebug()
|
||||
}
|
||||
|
||||
if cfg.Data.Clickhouse.MaxOpenConns != nil {
|
||||
opts.MaxOpenConns = int(cfg.Data.Clickhouse.GetMaxOpenConns())
|
||||
}
|
||||
if cfg.Data.Clickhouse.MaxIdleConns != nil {
|
||||
opts.MaxIdleConns = int(cfg.Data.Clickhouse.GetMaxIdleConns())
|
||||
}
|
||||
|
||||
// 设置ssl
|
||||
if cfg.Data.Clickhouse.Tls != nil {
|
||||
var tlsCfg *tls.Config
|
||||
var err error
|
||||
|
||||
if tlsCfg, err = utils.LoadServerTlsConfig(cfg.Data.Clickhouse.Tls); err != nil {
|
||||
if tlsCfg, err = utils.LoadServerTlsConfig(cfg.Server.Grpc.Tls); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if tlsCfg != nil {
|
||||
options.TLS = tlsCfg
|
||||
opts.TLS = tlsCfg
|
||||
}
|
||||
}
|
||||
|
||||
conn, err := clickhouse.Open(options)
|
||||
if err != nil {
|
||||
l.Fatalf("failed opening connection to clickhouse: %v", err)
|
||||
return nil
|
||||
if cfg.Data.Clickhouse.CompressionMethod != nil || cfg.Data.Clickhouse.CompressionLevel != nil {
|
||||
opts.Compression = &clickhouseV2.Compression{}
|
||||
|
||||
if cfg.Data.Clickhouse.GetCompressionMethod() != "" {
|
||||
opts.Compression.Method = compressionMap[cfg.Data.Clickhouse.GetCompressionMethod()]
|
||||
}
|
||||
if cfg.Data.Clickhouse.CompressionLevel != nil {
|
||||
opts.Compression.Level = int(cfg.Data.Clickhouse.GetCompressionLevel())
|
||||
}
|
||||
}
|
||||
if cfg.Data.Clickhouse.MaxCompressionBuffer != nil {
|
||||
opts.MaxCompressionBuffer = int(cfg.Data.Clickhouse.GetMaxCompressionBuffer())
|
||||
}
|
||||
|
||||
return conn
|
||||
if cfg.Data.Clickhouse.DialTimeout != nil {
|
||||
opts.DialTimeout = cfg.Data.Clickhouse.GetDialTimeout().AsDuration()
|
||||
}
|
||||
if cfg.Data.Clickhouse.ReadTimeout != nil {
|
||||
opts.ReadTimeout = cfg.Data.Clickhouse.GetReadTimeout().AsDuration()
|
||||
}
|
||||
if cfg.Data.Clickhouse.ConnMaxLifetime != nil {
|
||||
opts.ConnMaxLifetime = cfg.Data.Clickhouse.GetConnMaxLifetime().AsDuration()
|
||||
}
|
||||
|
||||
if cfg.Data.Clickhouse.HttpProxy != nil {
|
||||
proxyURL, err := url.Parse(cfg.Data.Clickhouse.GetHttpProxy())
|
||||
if err != nil {
|
||||
c.log.Errorf("failed to parse HTTP proxy URL: %v", err)
|
||||
return ErrInvalidProxyURL
|
||||
}
|
||||
|
||||
opts.HTTPProxyURL = proxyURL
|
||||
}
|
||||
|
||||
if cfg.Data.Clickhouse.ConnectionOpenStrategy != nil {
|
||||
strategy := clickhouseV2.ConnOpenInOrder
|
||||
switch cfg.Data.Clickhouse.GetConnectionOpenStrategy() {
|
||||
case "in_order":
|
||||
strategy = clickhouseV2.ConnOpenInOrder
|
||||
case "round_robin":
|
||||
strategy = clickhouseV2.ConnOpenRoundRobin
|
||||
case "random":
|
||||
strategy = clickhouseV2.ConnOpenRandom
|
||||
}
|
||||
opts.ConnOpenStrategy = strategy
|
||||
}
|
||||
|
||||
if cfg.Data.Clickhouse.Scheme != nil {
|
||||
switch cfg.Data.Clickhouse.GetScheme() {
|
||||
case "http":
|
||||
opts.Protocol = clickhouseV2.HTTP
|
||||
case "https":
|
||||
opts.Protocol = clickhouseV2.HTTP
|
||||
default:
|
||||
opts.Protocol = clickhouseV2.Native
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Data.Clickhouse.BlockBufferSize != nil {
|
||||
opts.BlockBufferSize = uint8(cfg.Data.Clickhouse.GetBlockBufferSize())
|
||||
}
|
||||
|
||||
// 创建ClickHouse连接
|
||||
conn, err := clickhouseV2.Open(opts)
|
||||
if err != nil {
|
||||
c.log.Errorf("failed to create clickhouse client: %v", err)
|
||||
return ErrConnectionFailed
|
||||
}
|
||||
|
||||
c.conn = conn
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭ClickHouse客户端连接
|
||||
func (c *Client) Close() {
|
||||
if c.conn == nil {
|
||||
c.log.Warn("clickhouse client is already closed or not initialized")
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.conn.Close(); err != nil {
|
||||
c.log.Errorf("failed to close clickhouse client: %v", err)
|
||||
} else {
|
||||
c.log.Info("clickhouse client closed successfully")
|
||||
}
|
||||
}
|
||||
|
||||
// GetServerVersion 获取ClickHouse服务器版本
|
||||
func (c *Client) GetServerVersion() string {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ""
|
||||
}
|
||||
|
||||
version, err := c.conn.ServerVersion()
|
||||
if err != nil {
|
||||
c.log.Errorf("failed to get server version: %v", err)
|
||||
return ""
|
||||
} else {
|
||||
c.log.Infof("ClickHouse server version: %s", version)
|
||||
return version.String()
|
||||
}
|
||||
}
|
||||
|
||||
// CheckConnection 检查ClickHouse客户端连接是否正常
|
||||
func (c *Client) CheckConnection(ctx context.Context) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
if err := c.conn.Ping(ctx); err != nil {
|
||||
c.log.Errorf("ping failed: %v", err)
|
||||
return ErrPingFailed
|
||||
}
|
||||
|
||||
c.log.Info("clickhouse client connection is healthy")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Query 执行查询并返回结果
|
||||
func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, query string, args ...any) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
if creator == nil {
|
||||
c.log.Error("creator function cannot be nil")
|
||||
return ErrCreatorFunctionNil
|
||||
}
|
||||
|
||||
rows, err := c.conn.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
c.log.Errorf("query failed: %v", err)
|
||||
return ErrQueryExecutionFailed
|
||||
}
|
||||
defer func(rows driverV2.Rows) {
|
||||
if err = rows.Close(); err != nil {
|
||||
c.log.Errorf("failed to close rows: %v", err)
|
||||
}
|
||||
}(rows)
|
||||
|
||||
for rows.Next() {
|
||||
row := creator()
|
||||
if err = rows.ScanStruct(row); err != nil {
|
||||
c.log.Errorf("failed to scan row: %v", err)
|
||||
return ErrRowScanFailed
|
||||
}
|
||||
*results = append(*results, row)
|
||||
}
|
||||
|
||||
// 检查是否有未处理的错误
|
||||
if rows.Err() != nil {
|
||||
c.log.Errorf("Rows iteration error: %v", rows.Err())
|
||||
return ErrRowsIterationError
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueryRow 执行查询并返回单行结果
|
||||
func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...any) error {
|
||||
row := c.conn.QueryRow(ctx, query, args...)
|
||||
if row == nil {
|
||||
c.log.Error("query row returned nil")
|
||||
return ErrRowNotFound
|
||||
}
|
||||
|
||||
if err := row.ScanStruct(dest); err != nil {
|
||||
c.log.Errorf("")
|
||||
return ErrRowScanFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Select 封装 SELECT 子句
|
||||
func (c *Client) Select(ctx context.Context, dest any, query string, args ...any) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
err := c.conn.Select(ctx, dest, query, args...)
|
||||
if err != nil {
|
||||
c.log.Errorf("select failed: %v", err)
|
||||
return ErrQueryExecutionFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Exec 执行非查询语句
|
||||
func (c *Client) Exec(ctx context.Context, query string, args ...any) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
if err := c.conn.Exec(ctx, query, args...); err != nil {
|
||||
c.log.Errorf("exec failed: %v", err)
|
||||
return ErrExecutionFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) prepareInsertData(data any) (string, string, []any, error) {
|
||||
val := reflect.ValueOf(data)
|
||||
if val.Kind() != reflect.Ptr || val.IsNil() {
|
||||
return "", "", nil, fmt.Errorf("data must be a non-nil pointer")
|
||||
}
|
||||
|
||||
val = val.Elem()
|
||||
typ := val.Type()
|
||||
|
||||
columns := make([]string, 0, typ.NumField())
|
||||
placeholders := make([]string, 0, typ.NumField())
|
||||
values := make([]any, 0, typ.NumField())
|
||||
|
||||
values = structToValueArray(data)
|
||||
|
||||
for i := 0; i < typ.NumField(); i++ {
|
||||
field := typ.Field(i)
|
||||
|
||||
// 优先获取 `ch` 标签,其次获取 `json` 标签,最后使用字段名
|
||||
columnName := field.Tag.Get("ch")
|
||||
if columnName == "" {
|
||||
jsonTag := field.Tag.Get("json")
|
||||
if jsonTag != "" {
|
||||
tags := strings.Split(jsonTag, ",") // 只取逗号前的部分
|
||||
if len(tags) > 0 {
|
||||
columnName = tags[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
if columnName == "" {
|
||||
columnName = field.Name
|
||||
}
|
||||
//columnName = strings.TrimSpace(columnName)
|
||||
|
||||
columns = append(columns, columnName)
|
||||
placeholders = append(placeholders, "?")
|
||||
}
|
||||
|
||||
return strings.Join(columns, ", "), strings.Join(placeholders, ", "), values, nil
|
||||
}
|
||||
|
||||
// Insert 插入数据到指定表
|
||||
func (c *Client) Insert(ctx context.Context, tableName string, in any) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
columns, placeholders, values, err := c.prepareInsertData(in)
|
||||
if err != nil {
|
||||
c.log.Errorf("prepare insert in failed: %v", err)
|
||||
return ErrPrepareInsertDataFailed
|
||||
}
|
||||
|
||||
// 构造 SQL 语句
|
||||
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
|
||||
tableName,
|
||||
columns,
|
||||
placeholders,
|
||||
)
|
||||
|
||||
// 执行插入操作
|
||||
if err = c.conn.Exec(ctx, query, values...); err != nil {
|
||||
c.log.Errorf("insert failed: %v", err)
|
||||
return ErrInsertFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) InsertMany(ctx context.Context, tableName string, data []any) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
c.log.Error("data slice is empty")
|
||||
return ErrInvalidColumnData
|
||||
}
|
||||
|
||||
var columns string
|
||||
var placeholders []string
|
||||
var values []any
|
||||
|
||||
for _, item := range data {
|
||||
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
|
||||
if err != nil {
|
||||
c.log.Errorf("prepare insert data failed: %v", err)
|
||||
return ErrPrepareInsertDataFailed
|
||||
}
|
||||
|
||||
if columns == "" {
|
||||
columns = itemColumns
|
||||
} else if columns != itemColumns {
|
||||
c.log.Error("data items have inconsistent columns")
|
||||
return ErrInvalidColumnData
|
||||
}
|
||||
|
||||
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
|
||||
values = append(values, itemValues...)
|
||||
}
|
||||
|
||||
// 构造 SQL 语句
|
||||
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
|
||||
tableName,
|
||||
columns,
|
||||
strings.Join(placeholders, ", "),
|
||||
)
|
||||
|
||||
// 执行插入操作
|
||||
if err := c.conn.Exec(ctx, query, values...); err != nil {
|
||||
c.log.Errorf("insert many failed: %v", err)
|
||||
return ErrInsertFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AsyncInsert 异步插入数据
|
||||
func (c *Client) AsyncInsert(ctx context.Context, tableName string, data any, wait bool) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
// 准备插入数据
|
||||
columns, placeholders, values, err := c.prepareInsertData(data)
|
||||
if err != nil {
|
||||
c.log.Errorf("prepare insert data failed: %v", err)
|
||||
return ErrPrepareInsertDataFailed
|
||||
}
|
||||
|
||||
// 构造 SQL 语句
|
||||
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
|
||||
tableName,
|
||||
columns,
|
||||
placeholders,
|
||||
)
|
||||
|
||||
// 执行异步插入
|
||||
if err = c.asyncInsert(ctx, query, wait, values...); err != nil {
|
||||
c.log.Errorf("async insert failed: %v", err)
|
||||
return ErrAsyncInsertFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// asyncInsert 异步插入数据
|
||||
func (c *Client) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
if err := c.conn.AsyncInsert(ctx, query, wait, args...); err != nil {
|
||||
c.log.Errorf("async insert failed: %v", err)
|
||||
return ErrAsyncInsertFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AsyncInsertMany 批量异步插入数据
|
||||
func (c *Client) AsyncInsertMany(ctx context.Context, tableName string, data []any, wait bool) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
c.log.Error("data slice is empty")
|
||||
return ErrInvalidColumnData
|
||||
}
|
||||
|
||||
// 准备插入数据的列名和占位符
|
||||
var columns string
|
||||
var placeholders []string
|
||||
var values []any
|
||||
|
||||
for _, item := range data {
|
||||
itemColumns, itemPlaceholders, itemValues, err := c.prepareInsertData(item)
|
||||
if err != nil {
|
||||
c.log.Errorf("prepare insert data failed: %v", err)
|
||||
return ErrPrepareInsertDataFailed
|
||||
}
|
||||
|
||||
if columns == "" {
|
||||
columns = itemColumns
|
||||
} else if columns != itemColumns {
|
||||
c.log.Error("data items have inconsistent columns")
|
||||
return ErrInvalidColumnData
|
||||
}
|
||||
|
||||
placeholders = append(placeholders, fmt.Sprintf("(%s)", itemPlaceholders))
|
||||
values = append(values, itemValues...)
|
||||
}
|
||||
|
||||
// 构造 SQL 语句
|
||||
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s",
|
||||
tableName,
|
||||
columns,
|
||||
strings.Join(placeholders, ", "),
|
||||
)
|
||||
|
||||
// 执行异步插入操作
|
||||
if err := c.asyncInsert(ctx, query, wait, values...); err != nil {
|
||||
c.log.Errorf("batch insert failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchInsert 批量插入数据
|
||||
func (c *Client) BatchInsert(ctx context.Context, tableName string, data []any) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
c.log.Error("data slice is empty")
|
||||
return ErrInvalidColumnData
|
||||
}
|
||||
|
||||
// 准备插入数据的列名和占位符
|
||||
var columns string
|
||||
var values [][]any
|
||||
|
||||
for _, item := range data {
|
||||
itemColumns, _, itemValues, err := c.prepareInsertData(item)
|
||||
if err != nil {
|
||||
c.log.Errorf("prepare insert data failed: %v", err)
|
||||
return ErrPrepareInsertDataFailed
|
||||
}
|
||||
|
||||
if columns == "" {
|
||||
columns = itemColumns
|
||||
} else if columns != itemColumns {
|
||||
c.log.Error("data items have inconsistent columns")
|
||||
return ErrInvalidColumnData
|
||||
}
|
||||
|
||||
values = append(values, itemValues)
|
||||
}
|
||||
|
||||
// 构造 SQL 语句
|
||||
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES", tableName, columns)
|
||||
|
||||
// 调用 batchExec 方法执行批量插入
|
||||
if err := c.batchExec(ctx, query, values); err != nil {
|
||||
c.log.Errorf("batch insert failed: %v", err)
|
||||
return ErrBatchInsertFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// batchExec 执行批量操作
|
||||
func (c *Client) batchExec(ctx context.Context, query string, data [][]any) error {
|
||||
batch, err := c.conn.PrepareBatch(ctx, query)
|
||||
if err != nil {
|
||||
c.log.Errorf("failed to prepare batch: %v", err)
|
||||
return ErrBatchPrepareFailed
|
||||
}
|
||||
|
||||
for _, row := range data {
|
||||
if err = batch.Append(row...); err != nil {
|
||||
c.log.Errorf("failed to append batch data: %v", err)
|
||||
return ErrBatchAppendFailed
|
||||
}
|
||||
}
|
||||
|
||||
if err = batch.Send(); err != nil {
|
||||
c.log.Errorf("failed to send batch: %v", err)
|
||||
return ErrBatchSendFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchStructs 批量插入结构体数据
|
||||
func (c *Client) BatchStructs(ctx context.Context, query string, data []any) error {
|
||||
if c.conn == nil {
|
||||
c.log.Error("clickhouse client is not initialized")
|
||||
return ErrClientNotInitialized
|
||||
}
|
||||
|
||||
// 准备批量插入
|
||||
batch, err := c.conn.PrepareBatch(ctx, query)
|
||||
if err != nil {
|
||||
c.log.Errorf("failed to prepare batch: %v", err)
|
||||
return ErrBatchPrepareFailed
|
||||
}
|
||||
|
||||
// 遍历数据并添加到批量插入
|
||||
for _, row := range data {
|
||||
if err := batch.AppendStruct(row); err != nil {
|
||||
c.log.Errorf("failed to append batch struct data: %v", err)
|
||||
return ErrBatchAppendFailed
|
||||
}
|
||||
}
|
||||
|
||||
// 发送批量插入
|
||||
if err = batch.Send(); err != nil {
|
||||
c.log.Errorf("failed to send batch: %v", err)
|
||||
return ErrBatchSendFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
500
database/clickhouse/client_test.go
Normal file
500
database/clickhouse/client_test.go
Normal file
@@ -0,0 +1,500 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"github.com/stretchr/testify/assert"
|
||||
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
|
||||
)
|
||||
|
||||
type Candle struct {
|
||||
Timestamp *time.Time `json:"timestamp" ch:"timestamp"`
|
||||
Symbol *string `json:"symbol" ch:"symbol"`
|
||||
Open *float64 `json:"open" ch:"open"`
|
||||
High *float64 `json:"high" ch:"high"`
|
||||
Low *float64 `json:"low" ch:"low"`
|
||||
Close *float64 `json:"close" ch:"close"`
|
||||
Volume *float64 `json:"volume" ch:"volume"`
|
||||
}
|
||||
|
||||
func createTestClient() *Client {
|
||||
cli, _ := NewClient(
|
||||
log.DefaultLogger,
|
||||
&conf.Bootstrap{
|
||||
Data: &conf.Data{
|
||||
Clickhouse: &conf.Data_ClickHouse{
|
||||
Addresses: []string{"localhost:9000"},
|
||||
Database: Ptr("finances"),
|
||||
Username: Ptr("default"),
|
||||
Password: Ptr("*Abcd123456"),
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
return cli
|
||||
}
|
||||
|
||||
func createCandlesTable(client *Client) {
|
||||
// 创建表的 SQL 语句
|
||||
createTableQuery := `
|
||||
CREATE TABLE IF NOT EXISTS candles (
|
||||
timestamp DateTime64(3),
|
||||
symbol String,
|
||||
open Float64,
|
||||
high Float64,
|
||||
low Float64,
|
||||
close Float64,
|
||||
volume Float64
|
||||
) ENGINE = MergeTree()
|
||||
ORDER BY timestamp
|
||||
`
|
||||
err := client.Exec(context.Background(), createTableQuery)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to create candles table: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewClient(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
// 测试 CheckConnection
|
||||
err := client.CheckConnection(context.Background())
|
||||
assert.NoError(t, err, "CheckConnection 应该成功执行")
|
||||
|
||||
// 测试 GetServerVersion
|
||||
version := client.GetServerVersion()
|
||||
assert.NotEmpty(t, version, "GetServerVersion 应该返回非空值")
|
||||
|
||||
createCandlesTable(client)
|
||||
}
|
||||
|
||||
func TestInsertCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 测试数据
|
||||
candle := &Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("AAPL"),
|
||||
Open: Ptr(100.5),
|
||||
High: Ptr(105.0),
|
||||
Low: Ptr(99.5),
|
||||
Close: Ptr(102.0),
|
||||
Volume: Ptr(1500.0),
|
||||
}
|
||||
|
||||
// 插入数据
|
||||
err := client.Insert(context.Background(), "candles", candle)
|
||||
assert.NoError(t, err, "InsertCandlesTable 应该成功执行")
|
||||
}
|
||||
|
||||
func TestInsertManyCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 测试数据
|
||||
data := []any{
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("AAPL"),
|
||||
Open: Ptr(100.5),
|
||||
High: Ptr(105.0),
|
||||
Low: Ptr(99.5),
|
||||
Close: Ptr(102.0),
|
||||
Volume: Ptr(1500.0),
|
||||
},
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("GOOG"),
|
||||
Open: Ptr(200.5),
|
||||
High: Ptr(205.0),
|
||||
Low: Ptr(199.5),
|
||||
Close: Ptr(202.0),
|
||||
Volume: Ptr(2500.0),
|
||||
},
|
||||
}
|
||||
|
||||
// 插入数据
|
||||
err := client.InsertMany(context.Background(), "candles", data)
|
||||
assert.NoError(t, err, "InsertManyCandlesTable 应该成功执行")
|
||||
}
|
||||
|
||||
func TestAsyncInsertCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 测试数据
|
||||
candle := &Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("BTC/USD"),
|
||||
Open: Ptr(30000.0),
|
||||
High: Ptr(31000.0),
|
||||
Low: Ptr(29000.0),
|
||||
Close: Ptr(30500.0),
|
||||
Volume: Ptr(500.0),
|
||||
}
|
||||
|
||||
// 异步插入数据
|
||||
err := client.AsyncInsert(context.Background(), "candles", candle, true)
|
||||
assert.NoError(t, err, "AsyncInsert 方法应该成功执行")
|
||||
|
||||
// 验证插入结果
|
||||
query := `
|
||||
SELECT timestamp, symbol, open, high, low, close, volume
|
||||
FROM candles
|
||||
WHERE symbol = ?
|
||||
`
|
||||
var result Candle
|
||||
err = client.QueryRow(context.Background(), &result, query, "BTC/USD")
|
||||
assert.NoError(t, err, "QueryRow 应该成功执行")
|
||||
assert.Equal(t, "BTC/USD", *result.Symbol, "symbol 列值应该为 BTC/USD")
|
||||
assert.Equal(t, 30500.0, *result.Close, "close 列值应该为 30500.0")
|
||||
assert.Equal(t, 500.0, *result.Volume, "volume 列值应该为 500.0")
|
||||
}
|
||||
|
||||
func TestAsyncInsertManyCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 测试数据
|
||||
data := []any{
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("AAPL"),
|
||||
Open: Ptr(100.5),
|
||||
High: Ptr(105.0),
|
||||
Low: Ptr(99.5),
|
||||
Close: Ptr(102.0),
|
||||
Volume: Ptr(1500.0),
|
||||
},
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("GOOG"),
|
||||
Open: Ptr(200.5),
|
||||
High: Ptr(205.0),
|
||||
Low: Ptr(199.5),
|
||||
Close: Ptr(202.0),
|
||||
Volume: Ptr(2500.0),
|
||||
},
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("MSFT"),
|
||||
Open: Ptr(300.5),
|
||||
High: Ptr(305.0),
|
||||
Low: Ptr(299.5),
|
||||
Close: Ptr(302.0),
|
||||
Volume: Ptr(3500.0),
|
||||
},
|
||||
}
|
||||
|
||||
// 批量插入数据
|
||||
err := client.AsyncInsertMany(context.Background(), "candles", data, true)
|
||||
assert.NoError(t, err, "AsyncInsertMany 方法应该成功执行")
|
||||
|
||||
// 验证插入结果
|
||||
query := `
|
||||
SELECT timestamp, symbol, open, high, low, close, volume
|
||||
FROM candles
|
||||
`
|
||||
var results []Candle
|
||||
err = client.Select(context.Background(), &results, query)
|
||||
assert.NoError(t, err, "查询数据应该成功执行")
|
||||
}
|
||||
|
||||
func TestInternalBatchExecCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 插入数据的 SQL 语句
|
||||
insertQuery := `
|
||||
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`
|
||||
|
||||
// 测试数据
|
||||
data := [][]interface{}{
|
||||
{"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0},
|
||||
{"2023-10-01 12:01:00", "GOOG", 200.5, 205.0, 199.5, 202.0, 2500.0},
|
||||
{"2023-10-01 12:02:00", "MSFT", 300.5, 305.0, 299.5, 302.0, 3500.0},
|
||||
}
|
||||
|
||||
// 批量插入数据
|
||||
err := client.batchExec(context.Background(), insertQuery, data)
|
||||
assert.NoError(t, err, "batchExec 应该成功执行")
|
||||
}
|
||||
|
||||
func TestBatchInsertCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 测试数据
|
||||
data := []any{
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("AAPL"),
|
||||
Open: Ptr(100.5),
|
||||
High: Ptr(105.0),
|
||||
Low: Ptr(99.5),
|
||||
Close: Ptr(102.0),
|
||||
Volume: Ptr(1500.0),
|
||||
},
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("GOOG"),
|
||||
Open: Ptr(200.5),
|
||||
High: Ptr(205.0),
|
||||
Low: Ptr(199.5),
|
||||
Close: Ptr(202.0),
|
||||
Volume: Ptr(2500.0),
|
||||
},
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("MSFT"),
|
||||
Open: Ptr(300.5),
|
||||
High: Ptr(305.0),
|
||||
Low: Ptr(299.5),
|
||||
Close: Ptr(302.0),
|
||||
Volume: Ptr(3500.0),
|
||||
},
|
||||
}
|
||||
|
||||
// 批量插入数据
|
||||
err := client.BatchInsert(context.Background(), "candles", data)
|
||||
assert.NoError(t, err, "BatchInsert 方法应该成功执行")
|
||||
|
||||
// 验证插入结果
|
||||
query := `
|
||||
SELECT timestamp, symbol, open, high, low, close, volume
|
||||
FROM candles
|
||||
`
|
||||
var results []Candle
|
||||
err = client.Select(context.Background(), &results, query)
|
||||
assert.NoError(t, err, "查询数据应该成功执行")
|
||||
}
|
||||
|
||||
func TestBatchStructsCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 插入数据的 SQL 语句
|
||||
insertQuery := `
|
||||
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`
|
||||
|
||||
// 测试数据
|
||||
data := []any{
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("AAPL"),
|
||||
Open: Ptr(100.5),
|
||||
High: Ptr(105.0),
|
||||
Low: Ptr(99.5),
|
||||
Close: Ptr(102.0),
|
||||
Volume: Ptr(1500.0),
|
||||
},
|
||||
&Candle{
|
||||
Timestamp: Ptr(time.Now()),
|
||||
Symbol: Ptr("GOOG"),
|
||||
Open: Ptr(200.5),
|
||||
High: Ptr(205.0),
|
||||
Low: Ptr(199.5),
|
||||
Close: Ptr(202.0),
|
||||
Volume: Ptr(2500.0),
|
||||
},
|
||||
}
|
||||
|
||||
// 批量插入数据
|
||||
err := client.BatchStructs(context.Background(), insertQuery, data)
|
||||
assert.NoError(t, err, "BatchStructsCandlesTable 应该成功执行")
|
||||
}
|
||||
|
||||
func TestInternalAsyncInsertIntoCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 插入数据的 SQL 语句
|
||||
insertQuery := `
|
||||
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`
|
||||
|
||||
// 测试数据
|
||||
err := client.asyncInsert(context.Background(), insertQuery, true,
|
||||
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
|
||||
assert.NoError(t, err, "InsertIntoCandlesTable 应该成功执行")
|
||||
}
|
||||
|
||||
func TestQueryCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 查询数据的 SQL 语句
|
||||
query := `
|
||||
SELECT timestamp, symbol, open, high, low, close, volume
|
||||
FROM candles
|
||||
`
|
||||
|
||||
// 定义结果集
|
||||
var results []any
|
||||
|
||||
// 执行查询
|
||||
err := client.Query(context.Background(), func() interface{} { return &Candle{} }, &results, query)
|
||||
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
|
||||
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
|
||||
for _, result := range results {
|
||||
candle, ok := result.(*Candle)
|
||||
assert.True(t, ok, "结果应该是 Candle 类型")
|
||||
assert.NotNil(t, candle.Timestamp, "Timestamp 列不应该为 nil")
|
||||
assert.NotNil(t, candle.Symbol, "Symbol 列不应该为 nil")
|
||||
assert.NotNil(t, candle.Open, "Open 列不应该为 nil")
|
||||
assert.NotNil(t, candle.High, "High 列不应该为 nil")
|
||||
assert.NotNil(t, candle.Low, "Low 列不应该为 nil")
|
||||
assert.NotNil(t, candle.Close, "Close 列不应该为 nil")
|
||||
assert.NotNil(t, candle.Volume, "Volume 列不应该为 nil")
|
||||
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||
candle.Timestamp.String(),
|
||||
*candle.Symbol,
|
||||
*candle.Open, *candle.High, *candle.Low, *candle.Close, *candle.Volume,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelectCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 查询数据的 SQL 语句
|
||||
query := `
|
||||
SELECT timestamp, symbol, open, high, low, close, volume
|
||||
FROM candles
|
||||
`
|
||||
|
||||
// 定义结果集
|
||||
var results []Candle
|
||||
|
||||
// 执行查询
|
||||
err := client.Select(context.Background(), &results, query)
|
||||
assert.NoError(t, err, "QueryCandlesTable 应该成功执行")
|
||||
assert.NotEmpty(t, results, "QueryCandlesTable 应该返回结果")
|
||||
|
||||
for _, result := range results {
|
||||
assert.NotNil(t, result.Timestamp, "Timestamp 列不应该为 nil")
|
||||
assert.NotNil(t, result.Symbol, "Symbol 列不应该为 nil")
|
||||
assert.NotNil(t, result.Open, "Open 列不应该为 nil")
|
||||
assert.NotNil(t, result.High, "High 列不应该为 nil")
|
||||
assert.NotNil(t, result.Low, "Low 列不应该为 nil")
|
||||
assert.NotNil(t, result.Close, "Close 列不应该为 nil")
|
||||
assert.NotNil(t, result.Volume, "Volume 列不应该为 nil")
|
||||
t.Logf("[%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||
result.Timestamp.String(),
|
||||
*result.Symbol,
|
||||
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryRow(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 插入测试数据
|
||||
insertQuery := `
|
||||
INSERT INTO candles (timestamp, symbol, open, high, low, close, volume)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`
|
||||
err := client.asyncInsert(context.Background(), insertQuery, true,
|
||||
"2023-10-01 12:00:00", "AAPL", 100.5, 105.0, 99.5, 102.0, 1500.0)
|
||||
assert.NoError(t, err, "数据插入失败")
|
||||
|
||||
// 查询单行数据
|
||||
query := `
|
||||
SELECT timestamp, symbol, open, high, low, close, volume
|
||||
FROM candles
|
||||
WHERE symbol = ?
|
||||
`
|
||||
|
||||
var result Candle
|
||||
|
||||
err = client.QueryRow(context.Background(), &result, query, "AAPL")
|
||||
assert.NoError(t, err, "QueryRow 应该成功执行")
|
||||
assert.Equal(t, "AAPL", *result.Symbol, "symbol 列值应该为 AAPL")
|
||||
assert.Equal(t, 100.5, *result.Open, "open 列值应该为 100.5")
|
||||
assert.Equal(t, 1500.0, *result.Volume, "volume 列值应该为 1500.0")
|
||||
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||
result.Timestamp.String(),
|
||||
*result.Symbol,
|
||||
*result.Open, *result.High, *result.Low, *result.Close, *result.Volume,
|
||||
)
|
||||
}
|
||||
|
||||
func TestDropCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
// 删除表的 SQL 语句
|
||||
dropTableQuery := `DROP TABLE IF EXISTS candles`
|
||||
|
||||
// 执行删除表操作
|
||||
err := client.Exec(context.Background(), dropTableQuery)
|
||||
assert.NoError(t, err, "DropCandlesTable 应该成功执行")
|
||||
}
|
||||
|
||||
func TestAggregateCandlesTable(t *testing.T) {
|
||||
client := createTestClient()
|
||||
assert.NotNil(t, client)
|
||||
|
||||
createCandlesTable(client)
|
||||
|
||||
// 聚合查询的 SQL 语句
|
||||
query := `
|
||||
SELECT symbol,
|
||||
MAX(high) AS max_high,
|
||||
MIN(low) AS min_low,
|
||||
AVG(close) AS avg_close,
|
||||
SUM(volume) AS total_volume
|
||||
FROM candles
|
||||
GROUP BY symbol
|
||||
`
|
||||
|
||||
// 定义结果集
|
||||
var results []struct {
|
||||
Symbol string `ch:"symbol"`
|
||||
MaxHigh float64 `ch:"max_high"`
|
||||
MinLow float64 `ch:"min_low"`
|
||||
AvgClose float64 `ch:"avg_close"`
|
||||
TotalVolume float64 `ch:"total_volume"`
|
||||
}
|
||||
|
||||
// 执行查询
|
||||
err := client.Select(context.Background(), &results, query)
|
||||
assert.NoError(t, err, "AggregateCandlesTable 应该成功执行")
|
||||
assert.NotEmpty(t, results, "AggregateCandlesTable 应该返回结果")
|
||||
}
|
||||
89
database/clickhouse/errors.go
Normal file
89
database/clickhouse/errors.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package clickhouse
|
||||
|
||||
import "github.com/go-kratos/kratos/v2/errors"
|
||||
|
||||
var (
|
||||
// ErrInvalidColumnName is returned when an invalid column name is used.
|
||||
ErrInvalidColumnName = errors.InternalServer("INVALID_COLUMN_NAME", "invalid column name")
|
||||
|
||||
// ErrInvalidTableName is returned when an invalid table name is used.
|
||||
ErrInvalidTableName = errors.InternalServer("INVALID_TABLE_NAME", "invalid table name")
|
||||
|
||||
// ErrInvalidCondition is returned when an invalid condition is used in a query.
|
||||
ErrInvalidCondition = errors.InternalServer("INVALID_CONDITION", "invalid condition in query")
|
||||
|
||||
// ErrQueryExecutionFailed is returned when a query execution fails.
|
||||
ErrQueryExecutionFailed = errors.InternalServer("QUERY_EXECUTION_FAILED", "query execution failed")
|
||||
|
||||
// ErrExecutionFailed is returned when a general execution fails.
|
||||
ErrExecutionFailed = errors.InternalServer("EXECUTION_FAILED", "execution failed")
|
||||
|
||||
// ErrAsyncInsertFailed is returned when an asynchronous insert operation fails.
|
||||
ErrAsyncInsertFailed = errors.InternalServer("ASYNC_INSERT_FAILED", "async insert operation failed")
|
||||
|
||||
// ErrRowScanFailed is returned when scanning rows from a query result fails.
|
||||
ErrRowScanFailed = errors.InternalServer("ROW_SCAN_FAILED", "row scan failed")
|
||||
|
||||
// ErrRowsIterationError is returned when there is an error iterating over rows.
|
||||
ErrRowsIterationError = errors.InternalServer("ROWS_ITERATION_ERROR", "rows iteration error")
|
||||
|
||||
// ErrRowNotFound is returned when a specific row is not found in the result set.
|
||||
ErrRowNotFound = errors.InternalServer("ROW_NOT_FOUND", "row not found")
|
||||
|
||||
// ErrConnectionFailed is returned when the connection to ClickHouse fails.
|
||||
ErrConnectionFailed = errors.InternalServer("CONNECTION_FAILED", "failed to connect to ClickHouse")
|
||||
|
||||
// ErrDatabaseNotFound is returned when the specified database is not found.
|
||||
ErrDatabaseNotFound = errors.InternalServer("DATABASE_NOT_FOUND", "specified database not found")
|
||||
|
||||
// ErrTableNotFound is returned when the specified table is not found.
|
||||
ErrTableNotFound = errors.InternalServer("TABLE_NOT_FOUND", "specified table not found")
|
||||
|
||||
// ErrInsertFailed is returned when an insert operation fails.
|
||||
ErrInsertFailed = errors.InternalServer("INSERT_FAILED", "insert operation failed")
|
||||
|
||||
// ErrUpdateFailed is returned when an update operation fails.
|
||||
ErrUpdateFailed = errors.InternalServer("UPDATE_FAILED", "update operation failed")
|
||||
|
||||
// ErrDeleteFailed is returned when a delete operation fails.
|
||||
ErrDeleteFailed = errors.InternalServer("DELETE_FAILED", "delete operation failed")
|
||||
|
||||
// ErrTransactionFailed is returned when a transaction fails.
|
||||
ErrTransactionFailed = errors.InternalServer("TRANSACTION_FAILED", "transaction failed")
|
||||
|
||||
// ErrClientNotInitialized is returned when the ClickHouse client is not initialized.
|
||||
ErrClientNotInitialized = errors.InternalServer("CLIENT_NOT_INITIALIZED", "clickhouse client not initialized")
|
||||
|
||||
// ErrGetServerVersionFailed is returned when getting the server version fails.
|
||||
ErrGetServerVersionFailed = errors.InternalServer("GET_SERVER_VERSION_FAILED", "failed to get server version")
|
||||
|
||||
// ErrPingFailed is returned when a ping to the ClickHouse server fails.
|
||||
ErrPingFailed = errors.InternalServer("PING_FAILED", "ping to ClickHouse server failed")
|
||||
|
||||
// ErrCreatorFunctionNil is returned when the creator function is nil.
|
||||
ErrCreatorFunctionNil = errors.InternalServer("CREATOR_FUNCTION_NIL", "creator function cannot be nil")
|
||||
|
||||
// ErrBatchPrepareFailed is returned when a batch prepare operation fails.
|
||||
ErrBatchPrepareFailed = errors.InternalServer("BATCH_PREPARE_FAILED", "batch prepare operation failed")
|
||||
|
||||
// ErrBatchSendFailed is returned when a batch send operation fails.
|
||||
ErrBatchSendFailed = errors.InternalServer("BATCH_SEND_FAILED", "batch send operation failed")
|
||||
|
||||
// ErrBatchAppendFailed is returned when appending to a batch fails.
|
||||
ErrBatchAppendFailed = errors.InternalServer("BATCH_APPEND_FAILED", "batch append operation failed")
|
||||
|
||||
// ErrBatchInsertFailed is returned when a batch insert operation fails.
|
||||
ErrBatchInsertFailed = errors.InternalServer("BATCH_INSERT_FAILED", "batch insert operation failed")
|
||||
|
||||
// ErrInvalidDSN is returned when the data source name (DSN) is invalid.
|
||||
ErrInvalidDSN = errors.InternalServer("INVALID_DSN", "invalid data source name")
|
||||
|
||||
// ErrInvalidProxyURL is returned when the proxy URL is invalid.
|
||||
ErrInvalidProxyURL = errors.InternalServer("INVALID_PROXY_URL", "invalid proxy URL")
|
||||
|
||||
// ErrPrepareInsertDataFailed is returned when preparing insert data fails.
|
||||
ErrPrepareInsertDataFailed = errors.InternalServer("PREPARE_INSERT_DATA_FAILED", "failed to prepare insert data")
|
||||
|
||||
// ErrInvalidColumnData is returned when the column data type is invalid.
|
||||
ErrInvalidColumnData = errors.InternalServer("INVALID_COLUMN_DATA", "invalid column data type")
|
||||
)
|
||||
@@ -7,27 +7,32 @@ toolchain go1.23.3
|
||||
replace github.com/tx7do/kratos-bootstrap/api => ../../api
|
||||
|
||||
require (
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.35.0
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.37.2
|
||||
github.com/go-kratos/kratos/v2 v2.8.4
|
||||
github.com/tx7do/kratos-bootstrap/api v0.0.21
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/tx7do/kratos-bootstrap/api v0.0.27
|
||||
github.com/tx7do/kratos-bootstrap/utils v0.1.3
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/ClickHouse/ch-go v0.66.0 // indirect
|
||||
github.com/andybalholm/brotli v1.1.1 // indirect
|
||||
github.com/ClickHouse/ch-go v0.66.1 // indirect
|
||||
github.com/andybalholm/brotli v1.2.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/go-faster/city v1.0.1 // indirect
|
||||
github.com/go-faster/errors v0.7.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/paulmach/orb v0.11.1 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/rogpeppe/go-internal v1.13.1 // indirect
|
||||
github.com/segmentio/asm v1.2.0 // indirect
|
||||
github.com/shopspring/decimal v1.4.0 // indirect
|
||||
go.opentelemetry.io/otel v1.36.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.36.0 // indirect
|
||||
go.opentelemetry.io/otel v1.37.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.37.0 // indirect
|
||||
golang.org/x/sys v0.33.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
|
||||
google.golang.org/grpc v1.73.0 // indirect
|
||||
google.golang.org/protobuf v1.36.6 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
github.com/ClickHouse/ch-go v0.66.0 h1:hLslxxAVb2PHpbHr4n0d6aP8CEIpUYGMVT1Yj/Q5Img=
|
||||
github.com/ClickHouse/ch-go v0.66.0/go.mod h1:noiHWyLMJAZ5wYuq3R/K0TcRhrNA8h7o1AqHX0klEhM=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.35.0 h1:ZMLZqxu+NiW55f4JS32kzyEbMb7CthGn3ziCcULOvSE=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.35.0/go.mod h1:O2FFT/rugdpGEW2VKyEGyMUWyQU0ahmenY9/emxLPxs=
|
||||
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
|
||||
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
|
||||
github.com/ClickHouse/ch-go v0.66.1 h1:LQHFslfVYZsISOY0dnOYOXGkOUvpv376CCm8g7W74A4=
|
||||
github.com/ClickHouse/ch-go v0.66.1/go.mod h1:NEYcg3aOFv2EmTJfo4m2WF7sHB/YFbLUuIWv9iq76xY=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.37.2 h1:wRLNKoynvHQEN4znnVHNLaYnrqVc9sGJmGYg+GGCfto=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.37.2/go.mod h1:pH2zrBGp5Y438DMwAxXMm1neSXPPjSI7tD4MURVULw8=
|
||||
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
|
||||
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
@@ -15,6 +15,8 @@ github.com/go-kratos/kratos/v2 v2.8.4 h1:eIJLE9Qq9WSoKx+Buy2uPyrahtF/lPh+Xf4MTpx
|
||||
github.com/go-kratos/kratos/v2 v2.8.4/go.mod h1:mq62W2101a5uYyRxe+7IdWubu7gZCGYqSNKwGFiiRcw=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
@@ -65,10 +67,10 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
|
||||
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
|
||||
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
|
||||
go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w=
|
||||
go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
|
||||
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
|
||||
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
|
||||
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
|
||||
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
@@ -80,12 +82,14 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
|
||||
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
|
||||
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
|
||||
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@@ -99,6 +103,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
|
||||
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
@@ -107,6 +113,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 h1:fc6jSaCT0vBduLYZHYrBBNY4dsWuvgyff9noRNDdBeE=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
|
||||
google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=
|
||||
google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
||||
|
||||
246
database/clickhouse/query.go
Normal file
246
database/clickhouse/query.go
Normal file
@@ -0,0 +1,246 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
)
|
||||
|
||||
type QueryBuilder struct {
|
||||
table string
|
||||
columns []string
|
||||
distinct bool
|
||||
conditions []string
|
||||
orderBy []string
|
||||
groupBy []string
|
||||
having []string
|
||||
joins []string
|
||||
with []string
|
||||
union []string
|
||||
limit int
|
||||
offset int
|
||||
params []interface{} // 用于存储参数
|
||||
useIndex string // 索引提示
|
||||
cacheResult bool // 是否缓存查询结果
|
||||
debug bool // 是否启用调试
|
||||
log *log.Helper
|
||||
}
|
||||
|
||||
// NewQueryBuilder 创建一个新的 QueryBuilder 实例
|
||||
func NewQueryBuilder(table string, log *log.Helper) *QueryBuilder {
|
||||
return &QueryBuilder{
|
||||
log: log,
|
||||
table: table,
|
||||
params: []interface{}{},
|
||||
}
|
||||
}
|
||||
|
||||
// EnableDebug 启用调试模式
|
||||
func (qb *QueryBuilder) EnableDebug() *QueryBuilder {
|
||||
qb.debug = true
|
||||
return qb
|
||||
}
|
||||
|
||||
// logDebug 打印调试信息
|
||||
func (qb *QueryBuilder) logDebug(message string) {
|
||||
if qb.debug {
|
||||
qb.log.Debug("[QueryBuilder Debug]:", message)
|
||||
}
|
||||
}
|
||||
|
||||
// Select 设置查询的列
|
||||
func (qb *QueryBuilder) Select(columns ...string) *QueryBuilder {
|
||||
for _, column := range columns {
|
||||
if !isValidIdentifier(column) {
|
||||
panic("Invalid column name")
|
||||
}
|
||||
}
|
||||
|
||||
qb.columns = columns
|
||||
return qb
|
||||
}
|
||||
|
||||
// Distinct 设置 DISTINCT 查询
|
||||
func (qb *QueryBuilder) Distinct() *QueryBuilder {
|
||||
qb.distinct = true
|
||||
return qb
|
||||
}
|
||||
|
||||
// Where 添加查询条件并支持参数化
|
||||
func (qb *QueryBuilder) Where(condition string, args ...interface{}) *QueryBuilder {
|
||||
if !isValidCondition(condition) {
|
||||
panic("Invalid condition")
|
||||
}
|
||||
|
||||
qb.conditions = append(qb.conditions, condition)
|
||||
qb.params = append(qb.params, args...)
|
||||
return qb
|
||||
}
|
||||
|
||||
// OrderBy 设置排序条件
|
||||
func (qb *QueryBuilder) OrderBy(order string) *QueryBuilder {
|
||||
qb.orderBy = append(qb.orderBy, order)
|
||||
return qb
|
||||
}
|
||||
|
||||
// GroupBy 设置分组条件
|
||||
func (qb *QueryBuilder) GroupBy(columns ...string) *QueryBuilder {
|
||||
qb.groupBy = append(qb.groupBy, columns...)
|
||||
return qb
|
||||
}
|
||||
|
||||
// Having 添加分组后的过滤条件并支持参数化
|
||||
func (qb *QueryBuilder) Having(condition string, args ...interface{}) *QueryBuilder {
|
||||
qb.having = append(qb.having, condition)
|
||||
qb.params = append(qb.params, args...)
|
||||
return qb
|
||||
}
|
||||
|
||||
// Join 添加 JOIN 操作
|
||||
func (qb *QueryBuilder) Join(joinType, table, onCondition string) *QueryBuilder {
|
||||
join := fmt.Sprintf("%s JOIN %s ON %s", joinType, table, onCondition)
|
||||
qb.joins = append(qb.joins, join)
|
||||
return qb
|
||||
}
|
||||
|
||||
// With 添加 WITH 子句
|
||||
func (qb *QueryBuilder) With(expression string) *QueryBuilder {
|
||||
qb.with = append(qb.with, expression)
|
||||
return qb
|
||||
}
|
||||
|
||||
// Union 添加 UNION 操作
|
||||
func (qb *QueryBuilder) Union(query string) *QueryBuilder {
|
||||
qb.union = append(qb.union, query)
|
||||
return qb
|
||||
}
|
||||
|
||||
// Limit 设置查询结果的限制数量
|
||||
func (qb *QueryBuilder) Limit(limit int) *QueryBuilder {
|
||||
qb.limit = limit
|
||||
return qb
|
||||
}
|
||||
|
||||
// Offset 设置查询结果的偏移量
|
||||
func (qb *QueryBuilder) Offset(offset int) *QueryBuilder {
|
||||
qb.offset = offset
|
||||
return qb
|
||||
}
|
||||
|
||||
// UseIndex 设置索引提示
|
||||
func (qb *QueryBuilder) UseIndex(index string) *QueryBuilder {
|
||||
qb.useIndex = index
|
||||
return qb
|
||||
}
|
||||
|
||||
// CacheResult 启用查询结果缓存
|
||||
func (qb *QueryBuilder) CacheResult() *QueryBuilder {
|
||||
qb.cacheResult = true
|
||||
return qb
|
||||
}
|
||||
|
||||
// ArrayJoin 添加 ARRAY JOIN 子句
|
||||
func (qb *QueryBuilder) ArrayJoin(expression string) *QueryBuilder {
|
||||
qb.joins = append(qb.joins, fmt.Sprintf("ARRAY JOIN %s", expression))
|
||||
return qb
|
||||
}
|
||||
|
||||
// Final 添加 FINAL 修饰符
|
||||
func (qb *QueryBuilder) Final() *QueryBuilder {
|
||||
qb.table = fmt.Sprintf("%s FINAL", qb.table)
|
||||
return qb
|
||||
}
|
||||
|
||||
// Sample 添加 SAMPLE 子句
|
||||
func (qb *QueryBuilder) Sample(sampleRate float64) *QueryBuilder {
|
||||
qb.table = fmt.Sprintf("%s SAMPLE %f", qb.table, sampleRate)
|
||||
return qb
|
||||
}
|
||||
|
||||
// LimitBy 添加 LIMIT BY 子句
|
||||
func (qb *QueryBuilder) LimitBy(limit int, columns ...string) *QueryBuilder {
|
||||
qb.limit = limit
|
||||
qb.orderBy = append(qb.orderBy, fmt.Sprintf("LIMIT BY %d (%s)", limit, strings.Join(columns, ", ")))
|
||||
return qb
|
||||
}
|
||||
|
||||
// PreWhere 添加 PREWHERE 子句
|
||||
func (qb *QueryBuilder) PreWhere(condition string, args ...interface{}) *QueryBuilder {
|
||||
qb.conditions = append([]string{condition}, qb.conditions...)
|
||||
qb.params = append(args, qb.params...)
|
||||
return qb
|
||||
}
|
||||
|
||||
// Format 添加 FORMAT 子句
|
||||
func (qb *QueryBuilder) Format(format string) *QueryBuilder {
|
||||
qb.union = append(qb.union, fmt.Sprintf("FORMAT %s", format))
|
||||
return qb
|
||||
}
|
||||
|
||||
// Build 构建最终的 SQL 查询
|
||||
func (qb *QueryBuilder) Build() (string, []interface{}) {
|
||||
query := ""
|
||||
|
||||
if qb.cacheResult {
|
||||
query += "/* CACHE */ "
|
||||
}
|
||||
|
||||
query += "SELECT "
|
||||
if qb.distinct {
|
||||
query += "DISTINCT "
|
||||
}
|
||||
query += qb.buildColumns()
|
||||
query += fmt.Sprintf(" FROM %s", qb.table)
|
||||
|
||||
if qb.useIndex != "" {
|
||||
query += fmt.Sprintf(" USE INDEX (%s)", qb.useIndex)
|
||||
}
|
||||
|
||||
if len(qb.conditions) > 0 {
|
||||
query += fmt.Sprintf(" WHERE %s", strings.Join(qb.conditions, " AND "))
|
||||
}
|
||||
|
||||
if len(qb.groupBy) > 0 {
|
||||
query += fmt.Sprintf(" GROUP BY %s", strings.Join(qb.groupBy, ", "))
|
||||
}
|
||||
|
||||
if len(qb.having) > 0 {
|
||||
query += fmt.Sprintf(" HAVING %s", strings.Join(qb.having, " AND "))
|
||||
}
|
||||
|
||||
if len(qb.orderBy) > 0 {
|
||||
query += fmt.Sprintf(" ORDER BY %s", strings.Join(qb.orderBy, ", "))
|
||||
}
|
||||
|
||||
if qb.limit > 0 {
|
||||
query += fmt.Sprintf(" LIMIT %d", qb.limit)
|
||||
}
|
||||
|
||||
if qb.offset > 0 {
|
||||
query += fmt.Sprintf(" OFFSET %d", qb.offset)
|
||||
}
|
||||
|
||||
return query, qb.params
|
||||
}
|
||||
|
||||
func (qb *QueryBuilder) buildColumns() string {
|
||||
if len(qb.columns) == 0 {
|
||||
return "*"
|
||||
}
|
||||
return strings.Join(qb.columns, ", ")
|
||||
}
|
||||
|
||||
// isValidIdentifier 验证表名或列名是否合法
|
||||
func isValidIdentifier(identifier string) bool {
|
||||
// 仅允许字母、数字、下划线,且不能以数字开头
|
||||
matched, _ := regexp.MatchString(`^[a-zA-Z_][a-zA-Z0-9_]*$`, identifier)
|
||||
return matched
|
||||
}
|
||||
|
||||
// isValidCondition 验证条件语句是否合法
|
||||
func isValidCondition(condition string) bool {
|
||||
// 简单验证条件中是否包含危险字符
|
||||
return !strings.Contains(condition, ";") && !strings.Contains(condition, "--")
|
||||
}
|
||||
120
database/clickhouse/query_test.go
Normal file
120
database/clickhouse/query_test.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestQueryBuilder(t *testing.T) {
|
||||
logger := log.NewHelper(log.DefaultLogger)
|
||||
qb := NewQueryBuilder("test_table", logger)
|
||||
|
||||
// 测试 Select 方法
|
||||
qb.Select("id", "name")
|
||||
query, params := qb.Build()
|
||||
assert.Contains(t, query, "SELECT id, name FROM test_table")
|
||||
|
||||
// 测试 Distinct 方法
|
||||
qb.Distinct()
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "SELECT DISTINCT id, name FROM test_table")
|
||||
|
||||
// 测试 Where 方法
|
||||
qb.Where("id > ?", 10).Where("name = ?", "example")
|
||||
query, params = qb.Build()
|
||||
assert.Contains(t, query, "WHERE id > ? AND name = ?")
|
||||
assert.Equal(t, []interface{}{10, "example"}, params)
|
||||
|
||||
// 测试 OrderBy 方法
|
||||
qb.OrderBy("name ASC")
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "ORDER BY name ASC")
|
||||
|
||||
// 测试 GroupBy 方法
|
||||
qb.GroupBy("category")
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "GROUP BY category")
|
||||
|
||||
// 测试 Having 方法
|
||||
qb.Having("COUNT(id) > ?", 5)
|
||||
query, params = qb.Build()
|
||||
assert.Contains(t, query, "HAVING COUNT(id) > ?")
|
||||
assert.Equal(t, []interface{}{10, "example", 5}, params)
|
||||
|
||||
// 测试 Join 方法
|
||||
qb.Join("INNER", "other_table", "test_table.id = other_table.id")
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "INNER JOIN other_table ON test_table.id = other_table.id")
|
||||
|
||||
// 测试 With 方法
|
||||
qb.With("temp AS (SELECT id FROM another_table WHERE status = 'active')")
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "WITH temp AS (SELECT id FROM another_table WHERE status = 'active')")
|
||||
|
||||
// 测试 Union 方法
|
||||
qb.Union("SELECT id FROM another_table")
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "UNION SELECT id FROM another_table")
|
||||
|
||||
// 测试 Limit 和 Offset 方法
|
||||
qb.Limit(10).Offset(20)
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "LIMIT 10 OFFSET 20")
|
||||
|
||||
// 测试 UseIndex 方法
|
||||
qb.UseIndex("idx_name")
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "USE INDEX (idx_name)")
|
||||
|
||||
// 测试 CacheResult 方法
|
||||
qb.CacheResult()
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "/* CACHE */")
|
||||
|
||||
// 测试 EnableDebug 方法
|
||||
qb.EnableDebug()
|
||||
assert.True(t, qb.debug)
|
||||
|
||||
// 测试 ArrayJoin 方法
|
||||
qb.ArrayJoin("array_column")
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "ARRAY JOIN array_column")
|
||||
|
||||
// 测试 Final 方法
|
||||
qb.Final()
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "test_table FINAL")
|
||||
|
||||
// 测试 Sample 方法
|
||||
qb.Sample(0.1)
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "test_table SAMPLE 0.100000")
|
||||
|
||||
// 测试 LimitBy 方法
|
||||
qb.LimitBy(5, "name")
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "LIMIT BY 5 (name)")
|
||||
|
||||
// 测试 PreWhere 方法
|
||||
qb.PreWhere("status = ?", "active")
|
||||
query, params = qb.Build()
|
||||
assert.Contains(t, query, "PREWHERE status = ?")
|
||||
assert.Equal(t, []interface{}{"active"}, params)
|
||||
|
||||
// 测试 Format 方法
|
||||
qb.Format("JSON")
|
||||
query, _ = qb.Build()
|
||||
assert.Contains(t, query, "FORMAT JSON")
|
||||
|
||||
// 测试边界情况:空列名
|
||||
assert.Panics(t, func() {
|
||||
qb.Select("")
|
||||
}, "应该抛出异常:无效的列名")
|
||||
|
||||
// 测试边界情况:无效条件
|
||||
assert.Panics(t, func() {
|
||||
qb.Where("id = 1; DROP TABLE test_table")
|
||||
}, "应该抛出异常:无效的条件")
|
||||
}
|
||||
146
database/clickhouse/utils.go
Normal file
146
database/clickhouse/utils.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
const (
|
||||
timeFormat = "2006-01-02 15:04:05.000000000"
|
||||
)
|
||||
|
||||
func structToValueArray(input any) []any {
|
||||
// 检查是否是指针类型,如果是则解引用
|
||||
val := reflect.ValueOf(input)
|
||||
if val.Kind() == reflect.Ptr {
|
||||
val = val.Elem()
|
||||
}
|
||||
|
||||
// 确保输入是结构体
|
||||
if val.Kind() != reflect.Struct {
|
||||
return nil
|
||||
}
|
||||
|
||||
var values []any
|
||||
for i := 0; i < val.NumField(); i++ {
|
||||
value := val.Field(i).Interface()
|
||||
|
||||
switch v := value.(type) {
|
||||
case *sql.NullString:
|
||||
if v.Valid {
|
||||
values = append(values, v.String)
|
||||
} else {
|
||||
values = append(values, nil)
|
||||
}
|
||||
case *sql.NullInt64:
|
||||
if v.Valid {
|
||||
values = append(values, v.Int64)
|
||||
} else {
|
||||
values = append(values, nil)
|
||||
}
|
||||
case *sql.NullFloat64:
|
||||
if v.Valid {
|
||||
values = append(values, v.Float64)
|
||||
} else {
|
||||
values = append(values, nil)
|
||||
}
|
||||
case *sql.NullBool:
|
||||
if v.Valid {
|
||||
values = append(values, v.Bool)
|
||||
} else {
|
||||
values = append(values, nil)
|
||||
}
|
||||
|
||||
case *sql.NullTime:
|
||||
if v != nil && v.Valid {
|
||||
values = append(values, v.Time.Format(timeFormat))
|
||||
} else {
|
||||
values = append(values, nil)
|
||||
}
|
||||
|
||||
case *time.Time:
|
||||
if v != nil {
|
||||
values = append(values, v.Format(timeFormat))
|
||||
} else {
|
||||
values = append(values, nil)
|
||||
}
|
||||
|
||||
case time.Time:
|
||||
// 处理 time.Time 类型
|
||||
if !v.IsZero() {
|
||||
values = append(values, v.Format(timeFormat))
|
||||
} else {
|
||||
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||
}
|
||||
|
||||
case timestamppb.Timestamp:
|
||||
// 处理 timestamppb.Timestamp 类型
|
||||
if !v.IsValid() {
|
||||
values = append(values, v.AsTime().Format(timeFormat))
|
||||
} else {
|
||||
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||
}
|
||||
|
||||
case *timestamppb.Timestamp:
|
||||
// 处理 *timestamppb.Timestamp 类型
|
||||
if v != nil && v.IsValid() {
|
||||
values = append(values, v.AsTime().Format(timeFormat))
|
||||
} else {
|
||||
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||
}
|
||||
|
||||
case durationpb.Duration:
|
||||
// 处理 timestamppb.Duration 类型
|
||||
if v.AsDuration() != 0 {
|
||||
values = append(values, v.AsDuration().String())
|
||||
} else {
|
||||
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||
}
|
||||
|
||||
case *durationpb.Duration:
|
||||
// 处理 *timestamppb.Duration 类型
|
||||
if v != nil && v.AsDuration() != 0 {
|
||||
values = append(values, v.AsDuration().String())
|
||||
} else {
|
||||
values = append(values, nil) // 如果时间为零值,插入 NULL
|
||||
}
|
||||
|
||||
case []any:
|
||||
// 处理切片类型
|
||||
if len(v) > 0 {
|
||||
for _, item := range v {
|
||||
if item == nil {
|
||||
values = append(values, nil)
|
||||
} else {
|
||||
values = append(values, item)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
values = append(values, nil) // 如果切片为空,插入 NULL
|
||||
}
|
||||
|
||||
case [][]any:
|
||||
// 处理二维切片类型
|
||||
if len(v) > 0 {
|
||||
for _, item := range v {
|
||||
if len(item) > 0 {
|
||||
values = append(values, item)
|
||||
} else {
|
||||
values = append(values, nil) // 如果子切片为空,插入 NULL
|
||||
}
|
||||
}
|
||||
} else {
|
||||
values = append(values, nil) // 如果二维切片为空,插入 NULL
|
||||
}
|
||||
|
||||
default:
|
||||
values = append(values, v)
|
||||
}
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
83
database/clickhouse/utils_test.go
Normal file
83
database/clickhouse/utils_test.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Ptr returns a pointer to the provided value.
|
||||
func Ptr[T any](v T) *T {
|
||||
return &v
|
||||
}
|
||||
|
||||
func TestStructToValueArrayWithCandle(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
candle := Candle{
|
||||
Timestamp: Ptr(now),
|
||||
Symbol: Ptr("AAPL"),
|
||||
Open: Ptr(100.5),
|
||||
High: Ptr(105.0),
|
||||
Low: Ptr(99.5),
|
||||
Close: Ptr(102.0),
|
||||
Volume: Ptr(1500.0),
|
||||
}
|
||||
|
||||
values := structToValueArray(candle)
|
||||
assert.NotNil(t, values, "Values should not be nil")
|
||||
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
|
||||
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
|
||||
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
|
||||
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
|
||||
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
|
||||
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
|
||||
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
|
||||
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
|
||||
|
||||
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||
values[0],
|
||||
*(values[1].(*string)),
|
||||
*values[2].(*float64),
|
||||
*values[3].(*float64),
|
||||
*values[4].(*float64),
|
||||
*values[5].(*float64),
|
||||
*values[6].(*float64),
|
||||
)
|
||||
}
|
||||
|
||||
func TestStructToValueArrayWithCandlePtr(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
candle := &Candle{
|
||||
Timestamp: Ptr(now),
|
||||
Symbol: Ptr("AAPL"),
|
||||
Open: Ptr(100.5),
|
||||
High: Ptr(105.0),
|
||||
Low: Ptr(99.5),
|
||||
Close: Ptr(102.0),
|
||||
Volume: Ptr(1500.0),
|
||||
}
|
||||
|
||||
values := structToValueArray(candle)
|
||||
assert.NotNil(t, values, "Values should not be nil")
|
||||
assert.Len(t, values, 7, "Expected 7 fields in the Candle struct")
|
||||
assert.Equal(t, now.Format(timeFormat), values[0].(string), "Timestamp should match")
|
||||
assert.Equal(t, *candle.Symbol, *(values[1].(*string)), "Symbol should match")
|
||||
assert.Equal(t, *candle.Open, *values[2].(*float64), "Open price should match")
|
||||
assert.Equal(t, *candle.High, *values[3].(*float64), "High price should match")
|
||||
assert.Equal(t, *candle.Low, *values[4].(*float64), "Low price should match")
|
||||
assert.Equal(t, *candle.Close, *values[5].(*float64), "Close price should match")
|
||||
assert.Equal(t, *candle.Volume, *values[6].(*float64), "Volume should match")
|
||||
|
||||
t.Logf("QueryRow Result: [%v] Candle: %s, Open: %f, High: %f, Low: %f, Close: %f, Volume: %f\n",
|
||||
values[0],
|
||||
*(values[1].(*string)),
|
||||
*values[2].(*float64),
|
||||
*values[3].(*float64),
|
||||
*values[4].(*float64),
|
||||
*values[5].(*float64),
|
||||
*values[6].(*float64),
|
||||
)
|
||||
}
|
||||
@@ -17,39 +17,16 @@
|
||||
|
||||
## Docker部署
|
||||
|
||||
### 拉取镜像
|
||||
|
||||
```bash
|
||||
docker pull bitnami/elasticsearch:latest
|
||||
```
|
||||
|
||||
### 启动容器
|
||||
|
||||
```bash
|
||||
docker run -itd \
|
||||
--name elasticsearch \
|
||||
-p 9200:9200 \
|
||||
-p 9300:9300 \
|
||||
-e ELASTICSEARCH_USERNAME=elastic \
|
||||
-e ELASTICSEARCH_PASSWORD=elastic \
|
||||
-e xpack.security.enabled=true \
|
||||
-e discovery.type=single-node \
|
||||
-e http.cors.enabled=true \
|
||||
-e http.cors.allow-origin=http://localhost:13580,http://127.0.0.1:13580 \
|
||||
-e http.cors.allow-headers=X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization \
|
||||
-e http.cors.allow-credentials=true \
|
||||
-e ELASTICSEARCH_NODE_NAME=elasticsearch-node-1 \
|
||||
-e ELASTICSEARCH_CLUSTER_NAME=elasticsearch-cluster \
|
||||
bitnami/elasticsearch:latest
|
||||
```
|
||||
|
||||
安装管理工具:
|
||||
|
||||
```bash
|
||||
docker pull appbaseio/dejavu:latest
|
||||
|
||||
docker run -itd \
|
||||
--name dejavu-test \
|
||||
-p 13580:1358 \
|
||||
appbaseio/dejavu:latest
|
||||
```
|
||||
|
||||
访问管理工具:<http://localhost:13580/>
|
||||
|
||||
@@ -6,8 +6,6 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/encoding"
|
||||
_ "github.com/go-kratos/kratos/v2/encoding/json"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
|
||||
elasticsearchV9 "github.com/elastic/go-elasticsearch/v9"
|
||||
@@ -18,15 +16,13 @@ import (
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
cli *elasticsearchV9.Client
|
||||
log *log.Helper
|
||||
codec encoding.Codec
|
||||
cli *elasticsearchV9.Client
|
||||
log *log.Helper
|
||||
}
|
||||
|
||||
func NewClient(logger log.Logger, cfg *conf.Bootstrap) (*Client, error) {
|
||||
c := &Client{
|
||||
log: log.NewHelper(log.With(logger, "module", "elasticsearch-client")),
|
||||
codec: encoding.GetCodec("json"),
|
||||
log: log.NewHelper(log.With(logger, "module", "elasticsearch-client")),
|
||||
}
|
||||
|
||||
if err := c.createESClient(cfg); err != nil {
|
||||
|
||||
@@ -10,7 +10,7 @@ require (
|
||||
github.com/elastic/go-elasticsearch/v9 v9.0.0
|
||||
github.com/go-kratos/kratos/v2 v2.8.4
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/tx7do/kratos-bootstrap/api v0.0.25
|
||||
github.com/tx7do/kratos-bootstrap/api v0.0.27
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -19,12 +19,13 @@ require (
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/google/gnostic v0.7.0 // indirect
|
||||
github.com/google/gnostic-models v0.6.9 // indirect
|
||||
github.com/google/gnostic-models v0.7.0 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/otel v1.37.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.37.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.37.0 // indirect
|
||||
go.yaml.in/yaml/v3 v3.0.3 // indirect
|
||||
golang.org/x/net v0.40.0 // indirect
|
||||
golang.org/x/sync v0.14.0 // indirect
|
||||
golang.org/x/sys v0.33.0 // indirect
|
||||
|
||||
@@ -722,8 +722,8 @@ github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6
|
||||
github.com/google/gnostic v0.7.0 h1:d7EpuFp8vVdML+y0JJJYiKeOLjKTdH/GvVkLOBWqJpw=
|
||||
github.com/google/gnostic v0.7.0/go.mod h1:IAcUyMl6vtC95f60EZ8oXyqTsOersP6HbwjeG7EyDPM=
|
||||
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49/go.mod h1:BkkQ4L1KS1xMt2aWSPStnn55ChGC0DPOn2FQYj+f25M=
|
||||
github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw=
|
||||
github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw=
|
||||
github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo=
|
||||
github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
@@ -891,6 +891,8 @@ go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mx
|
||||
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
|
||||
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
|
||||
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
|
||||
go.yaml.in/yaml/v3 v3.0.3 h1:bXOww4E/J3f66rav3pX3m8w6jDE4knZjGOw8b5Y6iNE=
|
||||
go.yaml.in/yaml/v3 v3.0.3/go.mod h1:tBHosrYAkRZjRAOREWbDnBXUf08JOwYq++0QNwQiWzI=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
|
||||
@@ -3,11 +3,8 @@ package influxdb
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/go-kratos/kratos/v2/encoding"
|
||||
_ "github.com/go-kratos/kratos/v2/encoding/json"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
|
||||
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
|
||||
conf "github.com/tx7do/kratos-bootstrap/api/gen/go/conf/v1"
|
||||
)
|
||||
@@ -15,14 +12,12 @@ import (
|
||||
type Client struct {
|
||||
cli *influxdb3.Client
|
||||
|
||||
log *log.Helper
|
||||
codec encoding.Codec
|
||||
log *log.Helper
|
||||
}
|
||||
|
||||
func NewClient(logger log.Logger, cfg *conf.Bootstrap) (*Client, error) {
|
||||
c := &Client{
|
||||
log: log.NewHelper(log.With(logger, "module", "influxdb-client")),
|
||||
codec: encoding.GetCodec("json"),
|
||||
log: log.NewHelper(log.With(logger, "module", "influxdb-client")),
|
||||
}
|
||||
|
||||
if err := c.createInfluxdbClient(cfg); err != nil {
|
||||
@@ -87,6 +82,31 @@ func (c *Client) Query(ctx context.Context, query string) (*influxdb3.QueryItera
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *Client) QueryWithParams(
|
||||
ctx context.Context,
|
||||
table string,
|
||||
filters map[string]interface{},
|
||||
operators map[string]string,
|
||||
fields []string,
|
||||
) (*influxdb3.QueryIterator, error) {
|
||||
if c.cli == nil {
|
||||
return nil, ErrInfluxDBClientNotInitialized
|
||||
}
|
||||
|
||||
query := BuildQueryWithParams(table, filters, operators, fields)
|
||||
result, err := c.cli.Query(
|
||||
ctx,
|
||||
query,
|
||||
influxdb3.WithQueryType(influxdb3.InfluxQL),
|
||||
)
|
||||
if err != nil {
|
||||
c.log.Errorf("failed to query data: %v", err)
|
||||
return nil, ErrInfluxDBQueryFailed
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Insert 插入数据
|
||||
func (c *Client) Insert(ctx context.Context, point *influxdb3.Point) error {
|
||||
if c.cli == nil {
|
||||
|
||||
@@ -11,7 +11,7 @@ require (
|
||||
github.com/go-kratos/kratos/v2 v2.8.4
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/tx7do/go-utils v1.1.29
|
||||
github.com/tx7do/kratos-bootstrap/api v0.0.25
|
||||
github.com/tx7do/kratos-bootstrap/api v0.0.27
|
||||
google.golang.org/protobuf v1.36.6
|
||||
)
|
||||
|
||||
|
||||
@@ -10,6 +10,43 @@ import (
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func BuildQuery(
|
||||
table string,
|
||||
filters map[string]interface{},
|
||||
operators map[string]string,
|
||||
fields []string,
|
||||
) (string, []interface{}) {
|
||||
var queryBuilder strings.Builder
|
||||
args := make([]interface{}, 0)
|
||||
|
||||
// 构建 SELECT 语句
|
||||
queryBuilder.WriteString("SELECT ")
|
||||
if len(fields) > 0 {
|
||||
queryBuilder.WriteString(strings.Join(fields, ", "))
|
||||
} else {
|
||||
queryBuilder.WriteString("*")
|
||||
}
|
||||
queryBuilder.WriteString(fmt.Sprintf(" FROM %s", table))
|
||||
|
||||
// 构建 WHERE 条件
|
||||
if len(filters) > 0 {
|
||||
queryBuilder.WriteString(" WHERE ")
|
||||
var conditions []string
|
||||
var operator string
|
||||
for key, value := range filters {
|
||||
operator = "=" // 默认操作符
|
||||
if op, exists := operators[key]; exists {
|
||||
operator = op
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("%s %s ?", key, operator))
|
||||
args = append(args, value)
|
||||
}
|
||||
queryBuilder.WriteString(strings.Join(conditions, " AND "))
|
||||
}
|
||||
|
||||
return queryBuilder.String(), args
|
||||
}
|
||||
|
||||
func GetPointTag(point *influxdb3.Point, name string) *string {
|
||||
if point == nil {
|
||||
return nil
|
||||
|
||||
@@ -1,9 +1,86 @@
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBuildQuery(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
table string
|
||||
filters map[string]interface{}
|
||||
operators map[string]string
|
||||
fields []string
|
||||
expectedQuery string
|
||||
expectedArgs []interface{}
|
||||
}{
|
||||
{
|
||||
name: "Basic query with filters and fields",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{"s": "AAPL", "o": 150.0},
|
||||
fields: []string{"s", "o", "h", "l", "c", "v"},
|
||||
expectedQuery: "SELECT s, o, h, l, c, v FROM candles WHERE s = ? AND o = ?",
|
||||
expectedArgs: []interface{}{"AAPL", 150.0},
|
||||
},
|
||||
{
|
||||
name: "Query with no filters",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{},
|
||||
fields: []string{"s", "o", "h"},
|
||||
expectedQuery: "SELECT s, o, h FROM candles",
|
||||
expectedArgs: []interface{}{},
|
||||
},
|
||||
{
|
||||
name: "Query with no fields",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{"s": "AAPL"},
|
||||
fields: []string{},
|
||||
expectedQuery: "SELECT * FROM candles WHERE s = ?",
|
||||
expectedArgs: []interface{}{"AAPL"},
|
||||
},
|
||||
{
|
||||
name: "Empty table name",
|
||||
table: "",
|
||||
filters: map[string]interface{}{"s": "AAPL"},
|
||||
fields: []string{"s", "o"},
|
||||
expectedQuery: "SELECT s, o FROM WHERE s = ?",
|
||||
expectedArgs: []interface{}{"AAPL"},
|
||||
},
|
||||
{
|
||||
name: "Special characters in filters",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{"name": "O'Reilly"},
|
||||
fields: []string{"name"},
|
||||
expectedQuery: "SELECT name FROM candles WHERE name = ?",
|
||||
expectedArgs: []interface{}{"O'Reilly"},
|
||||
},
|
||||
{
|
||||
name: "Query with interval filters",
|
||||
table: "candles",
|
||||
filters: map[string]interface{}{"time": "now() - interval '15 minutes'"},
|
||||
fields: []string{"*"},
|
||||
operators: map[string]string{"time": ">="},
|
||||
expectedQuery: "SELECT * FROM candles WHERE time >= ?",
|
||||
expectedArgs: []interface{}{"now() - interval '15 minutes'"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
query, args := BuildQuery(tt.table, tt.filters, tt.operators, tt.fields)
|
||||
|
||||
if query != tt.expectedQuery {
|
||||
t.Errorf("expected query %s, got %s", tt.expectedQuery, query)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(args, tt.expectedArgs) {
|
||||
t.Errorf("expected args %v, got %v", tt.expectedArgs, args)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildQueryWithParams(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
@@ -50,8 +50,6 @@ const (
|
||||
OperatorInc = "$inc" // 增加值
|
||||
OperatorMul = "$mul" // 乘法
|
||||
OperatorRename = "$rename" // 重命名字段
|
||||
OperatorMin = "$min" // 设置最小值
|
||||
OperatorMax = "$max" // 设置最大值
|
||||
OperatorCurrentDate = "$currentDate" // 设置当前日期
|
||||
OperatorAddToSet = "$addToSet" // 添加到集合
|
||||
OperatorPop = "$pop" // 删除数组中的元素
|
||||
@@ -81,6 +79,23 @@ const (
|
||||
OperatorIndexStats = "$indexStats" // 索引统计
|
||||
OperatorOut = "$out" // 输出
|
||||
OperatorMerge = "$merge" // 合并
|
||||
OperatorSum = "$sum" // 求和
|
||||
OperatorAvg = "$avg" // 平均值
|
||||
OperatorMin = "$min" // 最小值
|
||||
OperatorMax = "$max" // 最大值
|
||||
OperatorFirst = "$first" // 第一个值
|
||||
OperatorLast = "$last" // 最后一个值
|
||||
OperatorStdDevPop = "$stdDevPop" // 总体标准差
|
||||
OperatorStdDevSamp = "$stdDevSamp" // 样本标准差
|
||||
|
||||
// 类型转换操作符
|
||||
|
||||
OperatorToLong = "$toLong" // 转换为 long 类型
|
||||
OperatorToDouble = "$toDouble" // 转换为 double 类型
|
||||
OperatorToDecimal = "$toDecimal" // 转换为 decimal 类型
|
||||
OperatorToString = "$toString" // 转换为 string 类型
|
||||
OperatorToDate = "$toDate" // 转换为 date 类型
|
||||
OperatorToInt = "$toInt" // 转换为 int 类型
|
||||
|
||||
// 地理空间操作符
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ require (
|
||||
github.com/go-kratos/kratos/v2 v2.8.4
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/tx7do/go-utils v1.1.29
|
||||
github.com/tx7do/kratos-bootstrap/api v0.0.26
|
||||
github.com/tx7do/kratos-bootstrap/api v0.0.27
|
||||
go.mongodb.org/mongo-driver/v2 v2.2.2
|
||||
google.golang.org/protobuf v1.36.6
|
||||
)
|
||||
|
||||
@@ -6,8 +6,9 @@ import (
|
||||
)
|
||||
|
||||
type QueryBuilder struct {
|
||||
filter bsonV2.M
|
||||
opts *optionsV2.FindOptions
|
||||
filter bsonV2.M
|
||||
opts *optionsV2.FindOptions
|
||||
pipeline []bsonV2.M
|
||||
}
|
||||
|
||||
func NewQuery() *QueryBuilder {
|
||||
@@ -211,6 +212,17 @@ func (qb *QueryBuilder) SetPage(page, size int64) *QueryBuilder {
|
||||
return qb
|
||||
}
|
||||
|
||||
// AddStage 添加聚合阶段到管道
|
||||
func (qb *QueryBuilder) AddStage(stage bsonV2.M) *QueryBuilder {
|
||||
qb.pipeline = append(qb.pipeline, stage)
|
||||
return qb
|
||||
}
|
||||
|
||||
// BuildPipeline 返回最终的聚合管道
|
||||
func (qb *QueryBuilder) BuildPipeline() []bsonV2.M {
|
||||
return qb.pipeline
|
||||
}
|
||||
|
||||
// Build 返回最终的过滤条件和查询选项
|
||||
func (qb *QueryBuilder) Build() (bsonV2.M, *optionsV2.FindOptions) {
|
||||
return qb.filter, qb.opts
|
||||
|
||||
@@ -217,3 +217,22 @@ func TestSetNearSphere(t *testing.T) {
|
||||
|
||||
assert.Equal(t, expected, qb.filter[field])
|
||||
}
|
||||
|
||||
func TestQueryBuilderPipeline(t *testing.T) {
|
||||
// 创建 QueryBuilder 实例
|
||||
qb := NewQuery()
|
||||
|
||||
// 添加聚合阶段
|
||||
matchStage := bsonV2.M{OperatorMatch: bsonV2.M{"status": "active"}}
|
||||
groupStage := bsonV2.M{OperatorGroup: bsonV2.M{"_id": "$category", "count": bsonV2.M{OperatorSum: 1}}}
|
||||
sortStage := bsonV2.M{OperatorSortAgg: bsonV2.M{"count": -1}}
|
||||
|
||||
qb.AddStage(matchStage).AddStage(groupStage).AddStage(sortStage)
|
||||
|
||||
// 构建 Pipeline
|
||||
pipeline := qb.BuildPipeline()
|
||||
|
||||
// 验证 Pipeline
|
||||
expectedPipeline := []bsonV2.M{matchStage, groupStage, sortStage}
|
||||
assert.Equal(t, expectedPipeline, pipeline)
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ func NewLogger(cfg *conf.Logger) log.Logger {
|
||||
fallthrough
|
||||
case "text":
|
||||
loggerFormatter = &logrus.TextFormatter{
|
||||
ForceColors: cfg.Logrus.ForceColors,
|
||||
DisableColors: cfg.Logrus.DisableColors,
|
||||
DisableTimestamp: cfg.Logrus.DisableTimestamp,
|
||||
TimestampFormat: cfg.Logrus.TimestampFormat,
|
||||
|
||||
10
tag.bat
10
tag.bat
@@ -1,4 +1,4 @@
|
||||
git tag api/v0.0.26 --force
|
||||
git tag api/v0.0.27 --force
|
||||
|
||||
git tag utils/v0.1.4 --force
|
||||
|
||||
@@ -10,11 +10,11 @@ git tag tracer/v0.0.10 --force
|
||||
|
||||
git tag database/ent/v0.0.10 --force
|
||||
git tag database/gorm/v0.0.10 --force
|
||||
git tag database/mongodb/v0.0.11 --force
|
||||
git tag database/influxdb/v0.0.11 --force
|
||||
git tag database/mongodb/v0.0.12 --force
|
||||
git tag database/influxdb/v0.0.12 --force
|
||||
git tag database/clickhouse/v0.0.14 --force
|
||||
git tag database/elasticsearch/v0.0.12 --force
|
||||
git tag database/cassandra/v0.0.10 --force
|
||||
git tag database/clickhouse/v0.0.10 --force
|
||||
git tag database/elasticsearch/v0.0.1 --force
|
||||
|
||||
git tag registry/v0.1.0 --force
|
||||
git tag registry/consul/v0.1.0 --force
|
||||
|
||||
Reference in New Issue
Block a user