Skip to content

Commit

Permalink
Clean up vat package API.
Browse files Browse the repository at this point in the history
  • Loading branch information
lthibault committed May 25, 2024
1 parent e6e8a9d commit c97db08
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 35 deletions.
43 changes: 19 additions & 24 deletions vat/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package vat
import (
"context"
"log/slog"
"sync"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -12,28 +11,26 @@ import (

type ReleaseFunc func()

type StreamHandler struct {
Host host.Host
Proto protocol.ID
accept <-chan network.Stream
type ListenConfig struct {
Host host.Host
}

func (h *StreamHandler) Bind(ctx context.Context) ReleaseFunc {
func (c ListenConfig) Listen(ctx context.Context, id protocol.ID) Listener {
ctx, cancel := context.WithCancel(ctx)
ch := make(chan network.Stream)
*h = StreamHandler{
Host: h.Host,
Proto: h.Proto,
accept: ch,
}
c.Host.SetStreamHandler(id, NewStreamHandler(ctx, ch))

h.Host.SetStreamHandler(h.Proto, h.NewStreamHandler(ctx, ch))
return h.NewRelease(func() {
defer close(ch)
defer h.Host.RemoveStreamHandler(h.Proto)
})
return Listener{
C: ch,
Release: func() {
defer close(ch)
defer cancel()
c.Host.RemoveStreamHandler(id)
},
}
}

func (h *StreamHandler) NewStreamHandler(ctx context.Context, ch chan<- network.Stream) network.StreamHandler {
func NewStreamHandler(ctx context.Context, ch chan<- network.Stream) network.StreamHandler {
return func(s network.Stream) {
select {
case ch <- s:
Expand All @@ -54,16 +51,14 @@ func (h *StreamHandler) NewStreamHandler(ctx context.Context, ch chan<- network.
}
}

func (h *StreamHandler) NewRelease(f func()) ReleaseFunc {
var once sync.Once
return func() {
once.Do(f) // f is called at most once.
}
type Listener struct {
C <-chan network.Stream
Release ReleaseFunc
}

func (h *StreamHandler) Accept(ctx context.Context) (s network.Stream, err error) {
func (h Listener) Accept(ctx context.Context) (s network.Stream, err error) {
select {
case s = <-h.accept:
case s = <-h.C:
case <-ctx.Done():
err = ctx.Err()
}
Expand Down
113 changes: 111 additions & 2 deletions vat/streams_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,117 @@
package vat_test

import "testing"
import (
"context"
"testing"

"github.com/golang/mock/gomock"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
test_libp2p "github.com/wetware/go/test/libp2p"
"github.com/wetware/go/vat"
)

func TestStreamHandler(t *testing.T) {
t.Parallel()
t.Skip("TODO")

ctrl := gomock.NewController(t)
defer ctrl.Finish()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := test_libp2p.NewMockConn(ctrl)
c.EXPECT().RemotePeer().
Return(peer.ID("test")).
Times(1)

s := test_libp2p.NewMockStream(ctrl)
s.EXPECT().Conn().
Return(c).
Times(1)
s.EXPECT().ID().
Return("test").
Times(1)
s.EXPECT().Protocol().
Return(protocol.ID("test")).
Times(1)

ch := make(chan network.Stream)
handle := vat.NewStreamHandler(ctx, ch)
go handle(s)

_, err := vat.Listener{C: ch}.Accept(ctx)
require.NoError(t, err)
}

// func TestXXX(t *testing.T) {
// t.Parallel()

// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

// h, err := libp2p.New(
// libp2p.NoTransports,
// libp2p.NoListenAddrs,
// libp2p.Transport(inproc.New()),
// libp2p.ListenAddrStrings("/inproc/~"))
// require.NoError(t, err)
// defer h.Close()

// r := wazero.NewRuntime(ctx)
// defer r.Close(ctx)

// wasi.MustInstantiate(ctx, r)

// sys, err := system.Builder{}.Instantiate(ctx, r)
// require.NoError(t, err)
// defer sys.Close(ctx)

// b, err := os.ReadFile("testdata/socket/main.wasm")
// require.NoError(t, err)

// cm, err := r.CompileModule(ctx, b)
// require.NoError(t, err)
// defer cm.Close(ctx)

// ch := make(chan []byte, 1)
// defer close(ch)

// mod, err := r.InstantiateModule(ctx, cm, wazero.NewModuleConfig().
// WithName("test").
// WithStdin(sys.Stdin()).
// WithStdout(os.Stdout)) // support printf debugging in guest code
// require.NoError(t, err)
// defer mod.Close(ctx)

// proto := vat.ProtoFromModule(mod)
// handler := vat.HandlerConfig{
// Host: h,
// Proto: proto,
// }.Build(ctx)
// defer handler.Release()

// h2, err := libp2p.New(
// libp2p.NoTransports,
// libp2p.NoListenAddrs,
// libp2p.Transport(inproc.New()),
// libp2p.ListenAddrStrings("/inproc/~"))
// require.NoError(t, err)
// defer h2.Close()

// err = h2.Connect(ctx, *host.InfoFromHost(h))
// require.NoError(t, err)
// go func() {
// s, err := h2.NewStream(ctx, h.ID(), proto)
// require.NoError(t, err)
// defer s.Close()

// <-ctx.Done()
// }()

// s, err := handler.Accept(ctx)
// require.NoError(t, err)
// require.NotNil(t, s)
// }
12 changes: 5 additions & 7 deletions vat/vat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,21 @@ func (c NetConfig) Proto() protocol.ID {
return protocol.ID(proto)
}

func (c NetConfig) Build() Network {
func (c NetConfig) Build(ctx context.Context) Network {
if c.DialTimeout <= 0 {
c.DialTimeout = time.Second * 10
}

return Network{
NetConfig: c,
Handler: &StreamHandler{Host: c.Host, Proto: c.Proto()},
Listener: ListenConfig{Host: c.Host}.Listen(ctx, c.Proto()),
}
}

type Network struct {
Host host.Host
NetConfig
Handler *StreamHandler
Listener
}

func (n Network) String() string {
Expand All @@ -63,9 +64,6 @@ func (n Network) BootstrapClient() capnp.Client {
}

func (n Network) Serve(ctx context.Context) error {
release := n.Handler.Bind(ctx)
defer release()

proc := n.System.Bind(n.Guest)
defer proc.Release()

Expand Down Expand Up @@ -131,7 +129,7 @@ func (n Network) Dial(id rpc.PeerID, opt *rpc.Options) (*rpc.Conn, error) {
// supplied Options for the connection. Generally, callers will
// want to invoke this in a loop when launching a server.
func (n Network) Accept(ctx context.Context, opt *rpc.Options) (*rpc.Conn, error) {
s, err := n.Handler.Accept(ctx)
s, err := n.Listener.Accept(ctx)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions ww.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,14 @@ func (c Cluster) Serve(ctx context.Context) error {
}
defer mod.Close(ctx)

return vat.NetConfig{
net := vat.NetConfig{
Host: c.Host,
Guest: mod,
System: sys,
}.Build().Serve(ctx)
}.Build(ctx)
defer net.Release()

return net.Serve(ctx)
}

func (c Cluster) CompileModule(ctx context.Context, r wazero.Runtime) (wazero.CompiledModule, error) {
Expand Down

0 comments on commit c97db08

Please sign in to comment.