Skip to content

Commit

Permalink
perf: format kafka topic
Browse files Browse the repository at this point in the history
  • Loading branch information
sunhongtao committed Dec 6, 2023
1 parent 4c5cd68 commit 9819b90
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 28 deletions.
2 changes: 1 addition & 1 deletion blockchain/service/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (h *HttpHandler) SendRawTx(ctx *gin.Context) {

defer func(backup map[string]any, chainCode int64) {
bs, _ := json.Marshal(backup)
msg := &kafka.Message{Topic: fmt.Sprintf("%v_%v", h.kafkaCfg.Topic, chainCode), Partition: h.kafkaCfg.Partition, Key: []byte(fmt.Sprintf("%v", time.Now().UnixNano())), Value: bs}
msg := &kafka.Message{Topic: fmt.Sprintf("%v-%v", h.kafkaCfg.Topic, chainCode), Partition: h.kafkaCfg.Partition, Key: []byte(fmt.Sprintf("%v", time.Now().UnixNano())), Value: bs}
h.sendCh <- []*kafka.Message{msg}
}(backup, blockChainCode)

Expand Down
6 changes: 3 additions & 3 deletions collect/service/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ func (c *Cmd) HandlerBlock(block *collect.BlockInterface) (*kafka.Message, error
}
r = b
}
return &kafka.Message{Topic: k.Topic, Partition: k.Partition, Time: time.Now(), Key: []byte(block.BlockHash), Value: r}, nil
return &kafka.Message{Topic: fmt.Sprintf("%v-%v", c.chain.BlockChainCode, k.Topic), Partition: k.Partition, Time: time.Now(), Key: []byte(block.BlockHash), Value: r}, nil
}

func (c *Cmd) HandlerTx(tx *collect.TxInterface) (*kafka.Message, error) {
Expand All @@ -808,7 +808,7 @@ func (c *Cmd) HandlerTx(tx *collect.TxInterface) (*kafka.Message, error) {
r = b
}

return &kafka.Message{Topic: k.Topic, Partition: k.Partition, Time: time.Now(), Key: []byte(tx.TxHash), Value: r}, nil
return &kafka.Message{Topic: fmt.Sprintf("%v-%v", c.chain.BlockChainCode, k.Topic), Partition: k.Partition, Time: time.Now(), Key: []byte(tx.TxHash), Value: r}, nil
}

func (c *Cmd) HandlerReceipt(receipt *collect.ReceiptInterface) (*kafka.Message, error) {
Expand All @@ -835,7 +835,7 @@ func (c *Cmd) HandlerReceipt(receipt *collect.ReceiptInterface) (*kafka.Message,
}
r = b
}
return &kafka.Message{Topic: k.Topic, Partition: k.Partition, Time: time.Now(), Key: []byte(receipt.TransactionHash), Value: r}, nil
return &kafka.Message{Topic: fmt.Sprintf("%v-%v", c.chain.BlockChainCode, k.Topic), Partition: k.Partition, Time: time.Now(), Key: []byte(receipt.TransactionHash), Value: r}, nil
}

func (c *Cmd) Stop() {
Expand Down
36 changes: 18 additions & 18 deletions common/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ import (
)

// dev
var defaultChainCode = map[string]map[int64]int8{
"ETH": {200: 1, 2001: 1},
"POLYGON": {201: 1, 2011: 1},
"BSC": {202: 1},
"TRON": {205: 1},
"BTC": {300: 1},
"FIL": {301: 1},
"XRP": {310: 1},
}

// main
//var defaultChainCode = map[string]map[int64]int8{
// "ETH": {60: 1, 6001: 1},
// "POLYGON": {62: 1, 6201: 1},
// "BSC": {2510: 1, 2610: 1},
// "TRON": {195: 1, 198: 1},
// "BTC": {0: 1, 1: 1},
// "FIL": {2307: 1},
// "XRP": {144: 1},
// "ETH": {200: 1, 2001: 1},
// "POLYGON": {201: 1, 2011: 1},
// "BSC": {202: 1},
// "TRON": {205: 1},
// "BTC": {300: 1},
// "FIL": {301: 1},
// "XRP": {310: 1},
//}

// main
var defaultChainCode = map[string]map[int64]int8{
"ETH": {60: 1, 6001: 1},
"POLYGON": {62: 1, 6201: 1},
"BSC": {2510: 1, 2610: 1},
"TRON": {195: 1, 198: 1},
"BTC": {0: 1, 1: 1},
"FIL": {2307: 1},
"XRP": {144: 1},
}

func LoadConfig(path string) (string, error) {
f, err := os.OpenFile(path, os.O_RDONLY, os.ModeAppend)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions store/service/store_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *StoreHandler) ReadBackupTxFromKafka(blockChain int64, kafkaCfg map[stri
Kafka := kafkaCfg["BackupTx"]
broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port)
group := fmt.Sprintf("gr_store_backuptx_%v", Kafka.Group)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v_%v", Kafka.Topic, blockChain), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", Kafka.Topic, blockChain), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
}()

list := make([]*store.BackupTx, 0, 20)
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *StoreHandler) ReadSubTxFromKafka(blockChain int64, kafkaCfg map[string]
Kafka := kafkaCfg["SubTx"]
broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port)
group := fmt.Sprintf("gr_store_subtx_%v", Kafka.Group)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: Kafka.Topic, Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", blockChain, Kafka.Topic), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
}()

list := make([]*store.SubTx, 0, 20)
Expand Down Expand Up @@ -170,7 +170,7 @@ func (s *StoreHandler) ReadTxFromKafka(blockChain int64, kafkaCfg map[string]*co
Kafka := kafkaCfg["Tx"]
broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port)
group := fmt.Sprintf("gr_store_tx_%v", Kafka.Group)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: Kafka.Topic, Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", blockChain, Kafka.Topic), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
}()

list := make([]*store.Tx, 0, 20)
Expand Down Expand Up @@ -221,7 +221,7 @@ func (s *StoreHandler) ReadBlockFromKafka(blockChain int64, kafkaCfg map[string]
Kafka := kafkaCfg["Block"]
broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port)
group := fmt.Sprintf("gr_store_block_%v", Kafka.Group)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: Kafka.Topic, Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", blockChain, Kafka.Topic), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
}()

list := make([]*store.Block, 0, 20)
Expand Down Expand Up @@ -269,7 +269,7 @@ func (s *StoreHandler) ReadReceiptFromKafka(blockChain int64, kafkaCfg map[strin
Kafka := kafkaCfg["Receipt"]
broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port)
group := fmt.Sprintf("gr_store_receipt_%v", Kafka.Group)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: Kafka.Topic, Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", blockChain, Kafka.Topic), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2)
}()

list := make([]*store.Receipt, 0, 20)
Expand Down
2 changes: 1 addition & 1 deletion store/service/ws_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (ws *WsHandler) sendMessage(SubKafkaConfig *config.KafkaConfig, kafkaConfig
//save to kafka
if len(pushMp) > 0 && SubKafkaConfig != nil {
r, _ := json.Marshal(tx)
m := &kafka.Message{Topic: SubKafkaConfig.Topic, Key: []byte(uuid.New().String()), Value: r}
m := &kafka.Message{Topic: fmt.Sprintf("%v-%v", blockChain, SubKafkaConfig.Topic), Key: []byte(uuid.New().String()), Value: r}
bufferMessage = append(bufferMessage, m)
}

Expand Down

0 comments on commit 9819b90

Please sign in to comment.