Skip to content

Commit

Permalink
Make ping pong work :)
Browse files Browse the repository at this point in the history
  • Loading branch information
bahner committed Jan 28, 2024
1 parent f43599d commit df1037d
Show file tree
Hide file tree
Showing 19 changed files with 114 additions and 65 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ $(NAME): tidy

clean:
rm -f $(NAME)
make -C cmd/home clean
make -C cmd/pong clean
make -C cmd/relay clean

console:
Expand All @@ -44,8 +44,8 @@ distclean: clean
down:
docker-compose down

home:
make -C cmd/home
pong:
make -C cmd/pong

relay:
make -C cmd/relay go-ma-relay
Expand All @@ -56,8 +56,8 @@ image:
--build-arg "BUILD_IMAGE=$(BUILD_IMAGE)" \
.

install: relay home $(NAME)
sudo make -C cmd/home install
install: relay pong $(NAME)
sudo make -C cmd/pong install
sudo make -C cmd/relay install
sudo install -m755 $(NAME) $(DESTDIR)$(PREFIX)/bin/$(NAME)

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion cmd/home/Makefile → cmd/pong/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/make -ef


NAME = go-ma-home
NAME = go-ma-pong
PREFIX ?= /usr/local

default: clean $(NAME)
Expand Down
7 changes: 7 additions & 0 deletions cmd/pong/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Pong!

This is is a silly, but really useful actor, which only sends back a pong!
upon receiving of a 間 message.

So you can leave it on, just so that you always have something
to reply to, when testing.
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions cmd/home/events.go → cmd/pong/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func handleEvents(ctx context.Context, a *actor.Actor) {

// Check if the message is from self to prevent pong loop
if m.From != a.Entity.DID.String() {
log.Debugf("Sending pong to %s over %s", m.From, a.Entity.DID.String())
err := pong(ctx, a, m)
if err != nil {
log.Errorf("Error sending pong: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/home/go.mod → cmd/pong/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module go-ma-home
module go-ma-pong

go 1.21.6

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion cmd/home/main.go → cmd/pong/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func main() {

pflag.Parse()

config.Init("home")
config.Init("pong")
config.InitLogging()
config.InitP2P()
config.InitActor()
Expand Down
12 changes: 9 additions & 3 deletions cmd/home/messages.go → cmd/pong/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,25 @@ import (
// }

func pong(ctx context.Context, a *actor.Actor, m *msg.Message) error {
to, err := topic.GetOrCreate(m.From)

// Answer in the same channel, ie. my address. It's kinda like a broadcast to a "room"
to, err := topic.GetOrCreate(a.Entity.DID.String())
if err != nil {
return fmt.Errorf("failed subscribing to recipients topic: %w", errors.Cause(err))
}

// p means pong :-)
p, err := msg.New(m.To, m.From, []byte("Pong!"), "text/plain", a.Entity.Keyset.SigningKey.PrivKey)
if err != nil {
return fmt.Errorf("failed creating new message: %w", errors.Cause(err))
}

p.Send(ctx, to.Topic)
err = p.Send(ctx, to.Topic)
if err != nil {
return fmt.Errorf("failed sending message: %w", errors.Cause(err))
}

log.Debugf("Sending pong to %s", p.To)
log.Debugf("Sending pong to %s over %s", p.To, to.Topic.String())

return nil
}
File renamed without changes.
4 changes: 1 addition & 3 deletions cmd/relay/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (

func webHandler(w http.ResponseWriter, r *http.Request) {

desiredPeers := config.GetDesiredPeers()

allConnected := p.GetAllConnectedPeers()
if allConnected == nil {
log.Error("Failed to get connected peers.")
Expand All @@ -31,7 +29,7 @@ func webHandler(w http.ResponseWriter, r *http.Request) {
doc := New()
doc.Title = fmt.Sprintf("Bootstrap peer for rendezvous %s. Version %s ", ma.RENDEZVOUS, VERSION)
doc.H1 = fmt.Sprintf("%s@%s v%s", ma.RENDEZVOUS, (p.Node.ID().String()), VERSION)
doc.H1 += fmt.Sprintf("<br>Found %d/%d peers with rendezvous %s", len(peersWithRendezvous), desiredPeers, ma.RENDEZVOUS)
doc.H1 += fmt.Sprintf("<br>Found %d peers with rendezvous %s", len(peersWithRendezvous), ma.RENDEZVOUS)
doc.Addrs = p.Node.Addrs()
if allConnected == nil {
allConnected = peer.IDSlice{}
Expand Down
20 changes: 10 additions & 10 deletions config/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
const (
defaultLowWaterMark int = 3
defaultHighWaterMark int = 10
defaultDesiredPeers int = 3
// defaultDesiredPeers int = 3

defaultDiscoveryTimeout time.Duration = time.Second * 30
defaultConnMgrGrace time.Duration = time.Minute * 1
Expand All @@ -33,9 +33,9 @@ func init() {
viper.SetDefault("libp2p.connmgr.high_watermark", defaultHighWaterMark)
viper.BindPFlag("libp2p.connmgr.high_watermark", pflag.Lookup("high_watermark"))

pflag.Int("desired_peers", defaultDesiredPeers, "Desired number of peers to connect to.")
viper.SetDefault("libp2p.connmgr.desired_peers", defaultDesiredPeers)
viper.BindPFlag("libp2p.connmgr.desired_peers", pflag.Lookup("desired_peers"))
// pflag.Int("desired_peers", defaultDesiredPeers, "Desired number of peers to connect to.")
// viper.SetDefault("libp2p.connmgr.desired_peers", defaultDesiredPeers)
// viper.BindPFlag("libp2p.connmgr.desired_peers", pflag.Lookup("desired_peers"))

pflag.Duration("grace_period", defaultConnMgrGrace, "Grace period for connection manager.")
viper.SetDefault("libp2p.connmgr.grace_period", defaultConnMgrGrace)
Expand Down Expand Up @@ -167,13 +167,13 @@ func GetConnMgrGraceString() string {
return GetConnMgrGracePeriod().String()
}

func GetDesiredPeers() int {
return viper.GetInt("libp2p.connmgr.desired_peers")
}
// func GetDesiredPeers() int {
// return viper.GetInt("libp2p.connmgr.desired_peers")
// }

func GetDesiredPeersString() string {
return fmt.Sprint(GetDesiredPeers())
}
// func GetDesiredPeersString() string {
// return fmt.Sprint(GetDesiredPeers())
// }

func GetDiscoveryRetryInterval() time.Duration {
return viper.GetDuration("libp2p.discovery_retry")
Expand Down
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-pkcs11 v0.2.1-0.20230907215043-c6f79328ddf9/go.mod h1:6eQoGcuNJpa7jnd5pMGdkSaQpNDYvPlXWMcjXXThLlY=
github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM=
Expand Down
58 changes: 19 additions & 39 deletions p2p/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/bahner/go-ma"
"github.com/bahner/go-ma-actor/config"
Expand Down Expand Up @@ -93,14 +92,8 @@ func (d *DHT) Bootstrap(ctx context.Context) error {
// Takes a context and a DHT instance and discovers peers using the DHT.
// You might want to se server option or not for the DHT.
// Takes a variadic list of discovery.Option. You'll need this if you want to set a custom routing table.
func (d *DHT) DiscoverPeers(discoveryOpts ...discovery.Option) error {
desiredPeers := config.GetDesiredPeers()
log.Debugf("Starting DHT peer discovery searching for %d peers for rendezvous string: %s", desiredPeers, ma.RENDEZVOUS)

// ctx, cancel := config.GetDiscoveryContext()
// defer cancel()

ctx := context.Background()
func (d *DHT) DiscoverPeers(ctx context.Context, discoveryOpts ...discovery.Option) error {
log.Debugf("Starting DHT peer discovery searching for peers with rendezvous string: %s", ma.RENDEZVOUS)

log.Debugf("Peer discovery timeout: %v", config.GetDiscoveryTimeout())
log.Debugf("Peer discovery context %v", ctx)
Expand All @@ -114,48 +107,35 @@ func (d *DHT) DiscoverPeers(discoveryOpts ...discovery.Option) error {
dutil.Advertise(ctx, routingDiscovery, ma.RENDEZVOUS, discoveryOpts...)
log.Debugf("Advertising rendezvous string: %s", ma.RENDEZVOUS)

var foundPeers int
for foundPeers < desiredPeers {
log.Debugf("Searching for %d more peers", desiredPeers-foundPeers)
peerChan, err := routingDiscovery.FindPeers(ctx, ma.RENDEZVOUS, discoveryOpts...)
if err != nil {
return fmt.Errorf("dht:discovery: peer discovery error: %w", err)
}

peerChan, err := routingDiscovery.FindPeers(ctx, ma.RENDEZVOUS, discoveryOpts...)
if err != nil {
return fmt.Errorf("dht:discovery: peer discovery error: %w", err)
for p := range peerChan {
if p.ID == d.h.ID() {
continue // Skip self connection
}

for p := range peerChan {
if p.ID == d.h.ID() {
continue // Skip self connection
}

err := d.h.Connect(ctx, p)
if err != nil {
log.Debugf("Failed connecting to %s, error: %v", p.ID.String(), err)
continue
}

log.Debugf("Connected to DHT peer: %s", p.ID.String())
d.h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
d.h.ConnManager().Protect(p.ID, ma.RENDEZVOUS)
foundPeers++
log.Debugf("(Found %d/%d peers)", foundPeers, desiredPeers)

if foundPeers >= desiredPeers {
log.Infof("Desired number of peers (%d) discovered.", foundPeers)
return nil
}
err := d.h.Connect(ctx, p)
if err != nil {
log.Debugf("Failed connecting to %s, error: %v", p.ID.String(), err)
d.h.ConnManager().UntagPeer(p.ID, ma.RENDEZVOUS)
d.h.ConnManager().Unprotect(p.ID, ma.RENDEZVOUS)
continue
}

log.Debugf("Connected to DHT peer: %s", p.ID.String())
d.h.ConnManager().TagPeer(p.ID, ma.RENDEZVOUS, 10)
d.h.ConnManager().Protect(p.ID, ma.RENDEZVOUS)

// Check if the context was cancelled or timed out
if ctx.Err() != nil {
log.Warn("Context cancelled or timed out, stopping DHT peer discovery.")
return ctx.Err()
}

log.Debugf("Sleeping for %v before retrying peer discovery.", config.GetDiscoveryRetryInterval())
time.Sleep(config.GetDiscoveryRetryInterval())
}

log.Info("DHT Peer discovery complete")
return nil
}
4 changes: 2 additions & 2 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (p *P2P) DiscoverPeers() error {
ctx, cancel := config.GetDiscoveryContext()
defer cancel()

err := p.DHT.DiscoverPeers()
err := p.DHT.DiscoverPeers(ctx)
if err != nil {
return fmt.Errorf("failed to initialise DHT. Peer discovery unsuccessful: %w", err)
}
Expand All @@ -50,7 +50,7 @@ func (p *P2P) DiscoveryLoop(ctx context.Context) {
case <-ctx.Done():
return
default:
p.DHT.DiscoverPeers()
p.DHT.DiscoverPeers(ctx)
sleepTime := config.GetDiscoveryRetryInterval()
log.Debugf("Sleeping for %s", sleepTime.String())
time.Sleep(sleepTime)
Expand Down
55 changes: 55 additions & 0 deletions ui/topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package ui

import (
"context"

"github.com/bahner/go-ma-actor/p2p/topic"
log "github.com/sirupsen/logrus"
)

// Handle incoming messages. NB! This must be cancelled,
// when topic (localtion/home) changes.
func (ui *ChatUI) handleTopicEvents() {

ctx := context.Background()

t, err := topic.GetOrCreate(ui.e.DID)
envelopes := t.SubscribeEnvelopes(ctx)
if err != nil {
log.Debugf("topic creation error: %s", err)
return
}

for {
log.Debugf("Waiting for messages...")
select {
case e, ok := <-envelopes:
if !ok {
log.Debug("Envelope channel closed, exiting...")
return
}
log.Debugf("Received envelope: %v", e)

// Process the envelope and send a pong response
m, err := e.Open(ui.a.Entity.Keyset.EncryptionKey.PrivKey[:])
if err != nil {
log.Errorf("Error opening envelope: %v\n", err)
continue
}

log.Debugf("Received message: %v\n", string(m.Content))

// Check if the message is from self to prevent pong loop
if m.From != ui.a.Entity.DID.String() {
log.Debugf("Received message from %s", m.From)
ui.displayChatMessage(m)
} else {
log.Debugf("Received message from self, ignoring...")
}

case <-ctx.Done():
log.Debug("Context done, exiting...")
return
}
}
}
1 change: 1 addition & 0 deletions ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func NewChatUI(p *p2p.P2P, a *actor.Actor, id string) *ChatUI {
// the event loop for the text UI.
func (ui *ChatUI) Run() error {
go ui.handleEvents()
go ui.handleTopicEvents()
defer ui.end()

return ui.app.Run()
Expand Down

0 comments on commit df1037d

Please sign in to comment.