-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathconsumer.go
More file actions
123 lines (104 loc) · 3.15 KB
/
consumer.go
File metadata and controls
123 lines (104 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"encoding/json"
"fmt"
"os"
"os/signal"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// pre-requisuties
const username = "<YOUR USERNAME>"
const password = "<YOUR PASSWORD>"
const topic = "<TOPIC>" // e.g. "tron.broadcasted.transactions"
// end of pre-requisites
// Kafka consumer configuration
config := &kafka.ConfigMap{
"bootstrap.servers": "rpk0.bitquery.io:9093,rpk1.bitquery.io:9093,rpk2.bitquery.io:9093",
"group.id": username + "-mygroup",
"session.timeout.ms": 30000,
"security.protocol": "SASL_SSL",
"ssl.ca.location": "server.cer.pem",
"ssl.key.location": "client.key.pem",
"ssl.certificate.location": "client.cer.pem",
"ssl.endpoint.identification.algorithm": "none",
"sasl.mechanisms": "SCRAM-SHA-512",
"sasl.username": username,
"sasl.password": password,
"auto.offset.reset": "latest",
"enable.auto.commit": false,
}
// Initialize Kafka consumer
consumer, err := kafka.NewConsumer(config)
if err != nil {
fmt.Printf("Failed to create consumer: %s\n", err)
os.Exit(1)
}
// Subscribe to the topic
err = consumer.SubscribeTopics([]string{topic}, nil)
if err != nil {
fmt.Printf("Failed to subscribe to topic: %s\n", err)
os.Exit(1)
}
// Set up a channel to handle shutdown
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
// Poll messages and process them
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := consumer.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Print(e)
processMessage(e)
case kafka.Error:
fmt.Printf("Error: %v\n", e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
// Close down consumer
consumer.Close()
}
func processMessage(msg *kafka.Message) {
fmt.Printf("Received message on topic %s [%d] at offset %v:\n",
*msg.TopicPartition.Topic, msg.TopicPartition.Partition, msg.TopicPartition.Offset)
// Try to parse the message value as JSON
var jsonData interface{}
err := json.Unmarshal(msg.Value, &jsonData)
if err != nil {
fmt.Printf("Error parsing JSON: %v\n", err)
fmt.Printf("Raw message: %s\n", string(msg.Value))
return
}
// Pretty print the JSON
prettyJSON, err := json.MarshalIndent(jsonData, "", " ")
if err != nil {
fmt.Printf("Error prettifying JSON: %v\n", err)
return
}
fmt.Printf("Parsed JSON:\n%s\n", string(prettyJSON))
// Log message data
logEntry := map[string]interface{}{
"topic": *msg.TopicPartition.Topic,
"partition": msg.TopicPartition.Partition,
"offset": msg.TopicPartition.Offset,
"key": string(msg.Key),
"value": string(prettyJSON),
}
fmt.Printf("Log entry: %+v\n", logEntry)
fmt.Println("----------------------------------------")
}