Skip to content

Commit

Permalink
Merge pull request #54 from lesismal/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
lesismal committed Nov 20, 2023
2 parents 802970d + 9b56db4 commit e32daca
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 232 deletions.
19 changes: 1 addition & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,19 +750,6 @@ func (c *Client) run() {
}
}

func (c *Client) runWebsocket() {
c.mux.Lock()
defer c.mux.Unlock()
if !c.running {
c.running = true
c.initReader()
if c.Handler.AsyncWrite() {
go util.Safe(c.sendLoop)
}
c.Conn.(WebsocketConn).HandleWebsocket(c.recvLoop)
}
}

func (c *Client) initReader() {
if c.Handler.BatchRecv() {
c.Reader = c.Handler.WrapReader(c.Conn)
Expand Down Expand Up @@ -958,11 +945,7 @@ func newClientWithConn(conn net.Conn, codec codec.Codec, handler Handler, onStop
c.asyncHandlerMap = make(map[uint64]*asyncHandler)
c.onStop = onStop

if _, ok := conn.(WebsocketConn); !ok {
c.run()
} else {
c.runWebsocket()
}
c.run()

return c
}
Expand Down
79 changes: 41 additions & 38 deletions examples/httprpc/arpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ var _ErrDisconnected = "[error disconnected]";
var _ErrReconnecting = "[error reconnecting]";

function Codec() {
this.Marshal = function (obj) {
if (typeof (obj) == 'string') {
this.Marshal = function(obj) {
if (typeof(obj) == 'string') {
return new TextEncoder("utf-8").encode(obj);
}
return new TextEncoder("utf-8").encode(JSON.stringify(obj));
}
this.Unmarshal = function (data) {
this.Unmarshal = function(data) {
try {
data = JSON.parse(new TextDecoder("utf-8").decode(data));
return data;
Expand Down Expand Up @@ -69,90 +69,90 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {

this.state = _SOCK_STATE_CONNECTING;

this.handle = function (method, h) {
this.handle = function(method, h) {
if (this.handlers[method]) {
throw ("handler for [${method}] exists");
}
this.handlers[method] = { h: h };
}

this.callHttp = function (method, request, timeout, cb) {
this.callHttp = function(method, request, timeout, cb) {
this.call(method, request, timeout, cb, true);
}
this.call = function (method, request, timeout, cb, isHttp) {
this.call = function(method, request, timeout, cb, isHttp) {
if (this.state == _SOCK_STATE_CLOSED) {
if (typeof (cb) == 'function') {
if (typeof(cb) == 'function') {
cb({ data: null, err: _ErrClosed });
}
return new Promise(function (resolve, reject) {
return new Promise(function(resolve, reject) {
resolve({ data: null, err: _ErrClosed });
});
}
if (this.state == _SOCK_STATE_CONNECTING) {
if (typeof (cb) == 'function') {
if (typeof(cb) == 'function') {
cb({ data: null, err: _ErrReconnecting });
}
return new Promise(function (resolve, reject) {
return new Promise(function(resolve, reject) {
resolve({ data: null, err: _ErrReconnecting });
});
}
var session = {};
var p = new Promise(function (resolve, reject) {
var p = new Promise(function(resolve, reject) {
session.resolve = resolve;
});
if (typeof (cb) == 'function') {
if (typeof(cb) == 'function') {
session.resolve = cb;
}
this.sessionMap[seq] = session;

if (timeout > 0) {
session.timer = setTimeout(function () {
delete (client.sessionMap[seq]);
session.timer = setTimeout(function() {
delete(client.sessionMap[seq]);
session.resolve({ data: null, err: "timeout" });
}, timeout);
}
this.write(_CmdRequest, method, request, this.seqNum, this._onMessage, isHttp);
return p;
}

this.notifyHttp = function (method, notify) {
this.notifyHttp = function(method, notify) {
this.notify(method, notify, true);
}
this.notify = function (method, notify, isHttp) {
this.notify = function(method, notify, isHttp) {
if (this.state == _SOCK_STATE_CLOSED) {
return _ErrClosed;
}
if (this.state == _SOCK_STATE_CONNECTING) {
return _ErrReconnecting;
}
this.write(_CmdNotify, method, notify, function () { }, isHttp);
this.write(_CmdNotify, method, notify, function() {}, isHttp);
}
this.ping = function () {
this.ping = function() {
if (client.state == _SOCK_STATE_CLOSED) {
return _ErrClosed;
}
if (client.state == _SOCK_STATE_CONNECTING) {
return _ErrReconnecting;
}
client.write(_CmdPing, "", null, function () { });
client.write(_CmdPing, "", null, function() {});
}
this.pong = function () {
this.pong = function() {
if (client.state == _SOCK_STATE_CLOSED) {
return _ErrClosed;
}
if (client.state == _SOCK_STATE_CONNECTING) {
return _ErrReconnecting;
}
client.write(_CmdPong, "", null, function () { });
client.write(_CmdPong, "", null, function() {});
}
this.keepalive = function (timeout) {
this.keepalive = function(timeout) {
if (this._keepaliveInited) return;
this._keepaliveInited = true;
if (!timeout) timeout = 1000 * 30;
setInterval(this.ping, timeout);
this.keepaliveIntervalID = setInterval(this.ping, timeout);
}

this.write = function (cmd, method, arg, cb, isHttp) {
this.write = function(cmd, method, arg, cb, isHttp) {
var buffer;
if (arg) {
var data = this.codec.Marshal(arg);
Expand Down Expand Up @@ -187,24 +187,27 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
}
}

this.shutdown = function () {
this.shutdown = function() {
this.ws.close();
this.state = _SOCK_STATE_CLOSED;
if (!!this.keepaliveIntervalID) {
clearInterval(this.keepaliveIntervalID);
}
}

this.request = function (data, cb) {
this.request = function(data, cb) {
let resolve;
let p = new Promise(function (res) {
let p = new Promise(function(res) {
resolve = res;
if (typeof (cb) == 'function') {
resolve = function (ret) {
if (typeof(cb) == 'function') {
resolve = function(ret) {
res(ret);
cb(ret);
}
}
let r = new XMLHttpRequest();
r.open(this.httpMethod, this.httpUrl, true);
r.onreadystatechange = function () {
r.onreadystatechange = function() {
if (r.readyState != 4) {
return;
}
Expand All @@ -220,7 +223,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
return p;
}

this._onMessage = function (event) {
this._onMessage = function(event) {
try {
var offset = 0;
while (offset < event.data.byteLength) {
Expand Down Expand Up @@ -274,7 +277,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
if (session.timer) {
clearTimeout(session.timer);
}
delete (client.sessionMap[seq]);
delete(client.sessionMap[seq]);
var data = client.codec.Unmarshal(bodyArr);
if (isError) {
session.resolve({ data: null, err: data });
Expand All @@ -296,7 +299,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
}
}

this.init = function () {
this.init = function() {
console.log("[ArpcClient] init...");
if ('WebSocket' in window) {
client.ws = new WebSocket(this.url);
Expand All @@ -306,19 +309,19 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
client.ws = new SockJS(this.url);
}

// 消息类型,不设置则默认为'text'
// if not set this to `arraybuffer`, it will be 'text' by default.
client.ws.binaryType = 'arraybuffer';

client.state = _SOCK_STATE_CONNECTING;

client.ws.onopen = function (event) {
client.ws.onopen = function(event) {
client.state = _SOCK_STATE_CONNECTED;
console.log("[ArpcClient] websocket onopen");
if (client.onOpen) {
client.onOpen(client);
}
};
client.ws.onclose = function (event) {
client.ws.onclose = function(event) {
console.log("[ArpcClient] websocket onclose");
if (client.onClose) {
client.onClose(client);
Expand All @@ -341,7 +344,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
client.state = _SOCK_STATE_CONNECTING;
client.init();
};
client.ws.onerror = function (event) {
client.ws.onerror = function(event) {
console.log("[ArpcClient] websocket onerror");
if (client.onError) {
client.onError(client);
Expand All @@ -355,4 +358,4 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
} catch (e) {
console.log("[ArpcClient] init() failed:", e);
}
}
}
2 changes: 1 addition & 1 deletion examples/nbio/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
stdSvr = arpc.NewServer()
nbioSvr = nbio.NewGopher(nbio.Config{})

pool = taskpool.NewMixedPool(1024*8, 1, 1024*8)
pool = taskpool.New(1024*8, 1024)

method = "/echo"
)
Expand Down
Loading

0 comments on commit e32daca

Please sign in to comment.