Skip to content

Commit

Permalink
- support client sync write
Browse files Browse the repository at this point in the history
  • Loading branch information
lesismal committed Aug 3, 2021
1 parent 84a1a2b commit 9c6a620
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 43 deletions.
165 changes: 123 additions & 42 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Client struct {
Dialer DialerFunc
Head Header

IsAsync bool
IsAsyncWrite bool

running bool
reconnecting bool
Expand Down Expand Up @@ -131,14 +131,25 @@ func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout t
c.deleteSession(seq)
}()

select {
case c.chSend <- msg:
case <-timer.C:
// c.Handler.OnOverstock(c, msg)
return ErrClientTimeout
case <-c.chClose:
// c.Handler.OnOverstock(c, msg)
return ErrClientStopped
if c.IsAsyncWrite {
select {
case c.chSend <- msg:
case <-timer.C:
// c.Handler.OnOverstock(c, msg)
return ErrClientTimeout
case <-c.chClose:
// c.Handler.OnOverstock(c, msg)
return ErrClientStopped
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
if _, err := c.Handler.Send(c.Conn, msg.Buffer); err != nil {
c.Conn.Close()
return err
}
}

select {
Expand All @@ -165,14 +176,25 @@ func (c *Client) CallWith(ctx context.Context, method string, req interface{}, r
c.addSession(seq, sess)
defer c.deleteSession(seq)

select {
case c.chSend <- msg:
case <-ctx.Done():
// c.Handler.OnOverstock(c, msg)
return ErrClientTimeout
case <-c.chClose:
// c.Handler.OnOverstock(c, msg)
return ErrClientStopped
if c.IsAsyncWrite {
select {
case c.chSend <- msg:
case <-ctx.Done():
// c.Handler.OnOverstock(c, msg)
return ErrClientTimeout
case <-c.chClose:
// c.Handler.OnOverstock(c, msg)
return ErrClientStopped
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
if _, err := c.Handler.Send(c.Conn, msg.Buffer); err != nil {
c.Conn.Close()
return err
}
}

select {
Expand Down Expand Up @@ -208,19 +230,29 @@ func (c *Client) CallAsync(method string, req interface{}, handler HandlerFunc,
defer timer.Stop()
}

switch timeout {
case TimeZero:
err = c.pushMessage(msg, nil)
default:
err = c.pushMessage(msg, timer)
if c.IsAsyncWrite {
switch timeout {
case TimeZero:
err = c.pushMessage(msg, nil)
default:
err = c.pushMessage(msg, timer)
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
_, err = c.Handler.Send(c.Conn, msg.Buffer)
if err != nil {
c.Conn.Close()
}
}

if err != nil && handler != nil {
c.deleteAsyncHandler(seq)
}

return err

}

// Notify makes a notify with timeout.
Expand All @@ -232,13 +264,25 @@ func (c *Client) Notify(method string, data interface{}, timeout time.Duration,
}

msg := c.newRequestMessage(CmdNotify, method, data, false, true, args...)
switch timeout {
case TimeZero:
err = c.pushMessage(msg, nil)
default:
timer := time.NewTimer(timeout)
defer timer.Stop()
err = c.pushMessage(msg, timer)

if c.IsAsyncWrite {
switch timeout {
case TimeZero:
err = c.pushMessage(msg, nil)
default:
timer := time.NewTimer(timeout)
defer timer.Stop()
err = c.pushMessage(msg, timer)
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
_, err = c.Handler.Send(c.Conn, msg.Buffer)
if err != nil {
c.Conn.Close()
}
}

return err
Expand All @@ -253,14 +297,25 @@ func (c *Client) NotifyWith(ctx context.Context, method string, data interface{}

msg := c.newRequestMessage(CmdNotify, method, data, false, true, args...)

select {
case c.chSend <- msg:
case <-ctx.Done():
// c.Handler.OnOverstock(c, msg)
return ErrClientTimeout
case <-c.chClose:
// c.Handler.OnOverstock(c, msg)
return ErrClientStopped
if c.IsAsyncWrite {
select {
case c.chSend <- msg:
case <-ctx.Done():
// c.Handler.OnOverstock(c, msg)
return ErrClientTimeout
case <-c.chClose:
// c.Handler.OnOverstock(c, msg)
return ErrClientStopped
}
} else {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
if _, err := c.Handler.Send(c.Conn, msg.Buffer); err != nil {
c.Conn.Close()
return err
}
}

return nil
Expand All @@ -273,6 +328,18 @@ func (c *Client) PushMsg(msg *Message, timeout time.Duration) error {
return err
}

if !c.IsAsyncWrite {
coders := c.Handler.Coders()
for j := 0; j < len(coders); j++ {
msg = coders[j].Encode(c, msg)
}
_, err := c.Handler.Send(c.Conn, msg.Buffer)
if err != nil {
c.Conn.Close()
}
return err
}

if timeout < 0 {
timeout = TimeForever
}
Expand Down Expand Up @@ -323,7 +390,9 @@ func (c *Client) Restart() error {
c.values = map[interface{}]interface{}{}

c.initReader()
go util.Safe(c.sendLoop)
if c.IsAsyncWrite {
go util.Safe(c.sendLoop)
}
go util.Safe(c.recvLoop)

c.running = true
Expand Down Expand Up @@ -553,7 +622,9 @@ func (c *Client) run() {
if !c.running {
c.running = true
c.initReader()
go util.Safe(c.sendLoop)
if c.IsAsyncWrite {
go util.Safe(c.sendLoop)
}
go util.Safe(c.recvLoop)
}
}
Expand All @@ -564,7 +635,9 @@ func (c *Client) runWebsocket() {
if !c.running {
c.running = true
c.initReader()
go util.Safe(c.sendLoop)
if c.IsAsyncWrite {
go util.Safe(c.sendLoop)
}
c.Conn.(WebsocketConn).HandleWebsocket(c.recvLoop)
}
}
Expand Down Expand Up @@ -749,12 +822,19 @@ func newClientWithConn(conn net.Conn, codec codec.Codec, handler Handler, onStop
}

// NewClient creates a Client.
func NewClient(dialer DialerFunc) (*Client, error) {
func NewClient(dialer DialerFunc, args ...interface{}) (*Client, error) {
conn, err := dialer()
if err != nil {
return nil, err
}

isAsyncWrite := true
if len(args) > 0 {
if asyncWrite, ok := args[0].(bool); ok {
isAsyncWrite = asyncWrite
}
}

c := &Client{}
c.Conn = conn
c.Codec = codec.DefaultCodec
Expand All @@ -765,6 +845,7 @@ func NewClient(dialer DialerFunc) (*Client, error) {
c.chClose = make(chan util.Empty)
c.sessionMap = make(map[uint64]*rpcSession)
c.asyncHandlerMap = make(map[uint64]HandlerFunc)
c.IsAsyncWrite = isAsyncWrite

c.run()

Expand Down
2 changes: 1 addition & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (ctx *Context) Value(key interface{}) interface{} {

func (ctx *Context) write(v interface{}, isError bool, timeout time.Duration) error {
cli := ctx.Client
if cli.IsAsync {
if cli.IsAsyncWrite {
return ctx.writeDirectly(v, isError)
}
req := ctx.Message
Expand Down

0 comments on commit 9c6a620

Please sign in to comment.