Skip to content

Commit

Permalink
Refactor peersMap to hold AddrInfo and be prunable
Browse files Browse the repository at this point in the history
  • Loading branch information
bahner committed Nov 25, 2023
1 parent 1270351 commit c681a35
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 46 deletions.
12 changes: 6 additions & 6 deletions p2p/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,24 @@ discoveryLoop:

for {
select {
case peer, ok := <-peerChan:
case p, ok := <-peerChan:
if !ok {
peerChan = nil
break
}
if peer.ID == h.ID() {
if p.ID == h.ID() {
continue // Skip self connection
}

err := h.Connect(ctx, peer) // Using the outer context directly
err := h.Connect(ctx, p) // Using the outer context directly
if err != nil {
log.Debugf("Failed connecting to %s, error: %v", peer.ID.String(), err)
log.Debugf("Failed connecting to %s, error: %v", p.ID.String(), err)
} else {
log.Infof("Connected to DHT peer: %s", peer.ID.String())
log.Infof("Connected to DHT peer: %s", p.ID.String())

// Add peer to list of known peers
peerMutex.Lock()
connectedPeers[peer.ID.String()] = struct{}{}
connectedPeers[p.ID.String()] = &p
peerMutex.Unlock()

break discoveryLoop
Expand Down
14 changes: 7 additions & 7 deletions p2p/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ func DiscoverMDNSPeers(ctx context.Context, h host.Host) error {
discoveryLoop:
for {
select {
case peer, ok := <-peerChan:
case p, ok := <-peerChan:
if !ok {
log.Debug("MDNS peer channel closed.")
break discoveryLoop
}
if peer.ID == h.ID() {
if p.ID == h.ID() {
continue // Skip self connection
}

log.Infof("Found MDNS peer: %s connecting", peer.ID.String())
err := h.Connect(ctx, peer)
log.Infof("Found MDNS peer: %s connecting", p.ID.String())
err := h.Connect(ctx, p)
if err != nil {
log.Debugf("Failed connecting to %s, error: %v", peer.ID.String(), err)
log.Debugf("Failed connecting to %s, error: %v", p.ID.String(), err)
} else {
log.Infof("Connected to MDNS peer: %s", peer.ID.String())
log.Infof("Connected to MDNS peer: %s", p.ID.String())

// Add peer to list of known peers
peerMutex.Lock()
connectedPeers[peer.ID.String()] = struct{}{}
connectedPeers[p.ID.String()] = &p
peerMutex.Unlock()

break discoveryLoop
Expand Down
37 changes: 29 additions & 8 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/bahner/go-ma/key/ipns"
p2ppubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
log "github.com/sirupsen/logrus"
)

var (
Expand All @@ -22,7 +24,7 @@ var (
n host.Host
ps *p2ppubsub.PubSub

connectedPeers = make(map[string]struct{})
connectedPeers = make(map[string]*peer.AddrInfo)
peerMutex sync.Mutex
)

Expand Down Expand Up @@ -75,13 +77,32 @@ func GetNode() host.Host {
return n
}

func GetConnectedPeers() []string {
peerMutex.Lock()
defer peerMutex.Unlock()
// Get list of connectpeers.
// The connectTimeout is how long to wait for a connection to be established.
// This applies to each host in turn.
// If set to 0 a default timeout of 5 seconds will be used.
func GetConnectedPeers(connectTimeout time.Duration) map[string]*peer.AddrInfo {
defaultTimeoutSeconds := 5

peers := make([]string, 0, len(connectedPeers))
for peer := range connectedPeers {
peers = append(peers, peer)
if connectTimeout == 0 {
connectTimeout = time.Duration(defaultTimeoutSeconds) * time.Second
}
return peers

for p, addrs := range connectedPeers {

ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
defer cancel()

// Try connecting to the peer
if err := n.Connect(ctx, *addrs); err != nil {
log.Debugf("Failed connecting to %s, error: %v. Pruning.", p, err)

peerMutex.Lock()
delete(connectedPeers, p)
peerMutex.Unlock()
}
}

// No need to copy the peers again, as the new hosts are already live
return connectedPeers
}
25 changes: 18 additions & 7 deletions peer/peer.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,38 @@
package peer

import p2peer "github.com/libp2p/go-libp2p/core/peer"

type Peer struct {
// ID is the peer's ID
ID string
// Name is the peer's name
Alias string
// AddrInfo
AddrInfo *p2peer.AddrInfo
}

func New(id string, alias string) *Peer {
func NewWithAlias(addrInfo *p2peer.AddrInfo, alias string) *Peer {

id := addrInfo.ID.String()
return &Peer{
ID: id,
Alias: alias,
ID: id,
Alias: alias,
AddrInfo: addrInfo,
}
}

func NewFromID(id string) *Peer {
return New(id, id[len(id)-8:])
func New(addrInfo *p2peer.AddrInfo) *Peer {
alias := addrInfo.ID.String()
return NewWithAlias(addrInfo, alias)
}

func GetOrCreate(id string) *Peer {
func GetOrCreate(addrInfo *p2peer.AddrInfo) *Peer {

id := addrInfo.ID.String()

p := Get(id)
if p == nil {
p = NewFromID(id)
p = New(addrInfo)
Add(p)
}
return p
Expand Down
23 changes: 5 additions & 18 deletions ui/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ui
import (
"fmt"
"sort"
"time"

"github.com/bahner/go-ma-actor/p2p"
"github.com/bahner/go-ma-actor/peer"
Expand All @@ -24,7 +25,10 @@ func (ui *ChatUI) handleAliasCommand(args []string) {
// refreshPeers pulls the list of peers currently in the chat room and
// displays the last 8 chars of their peer id in the Peers panel in the ui.
func (ui *ChatUI) refreshPeers() {
peers := p2p.GetConnectedPeers()

// Tweak this to change the timeout for peer discovery
peerConnectTimeout := 2
peers := p2p.GetConnectedPeers(time.Duration(peerConnectTimeout) * time.Second)

// clear is thread-safe
ui.peersList.Clear()
Expand All @@ -46,20 +50,3 @@ func (ui *ChatUI) refreshPeers() {

ui.app.Draw()
}

// func (ui *ChatUI) handleStatusCommand(args []string) {
// if len(args) > 1 {
// switch args[1] {
// case "sub":
// ui.displayStatusMessage(ui.getStatusSub())
// case "topic":
// ui.displayStatusMessage(ui.getStatusTopic())
// case "host":
// ui.displayStatusMessage(ui.getStatusHost())
// default:
// ui.displaySystemMessage("Unknown status type: " + args[1])
// }
// } else {
// ui.displaySystemMessage("Usage: /status [sub|topic|host]")
// }
// }

0 comments on commit c681a35

Please sign in to comment.