/*
* Vibe Server
* http://vibe-project.github.io/projects/vibe-protocol/
*
* Copyright 2014 The Vibe Project
* Licensed under the Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0
*/
/*
* Vibe Server
* http://vibe-project.github.io/projects/vibe-protocol/
*
* Copyright 2014 The Vibe Project
* Licensed under the Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0
*/
var events = require("events");
var url = require("url");
var WebSocket = require("ws");
This module is exposed to the parent module’s server
as a constructor of
server.
module.exports = server;
A server instance to be returned by this function is expected to consume exchange of HTTP request and response and WebSocket and produce socket. HTTP protocol and WebSocket protocol are standardized in RFC 2616 and RFC 6455, respectively.
function server() {
var sockets = {};
var server = new events.EventEmitter();
server.handleRequest = function(req, res) {
req.params = url.parse(req.url, true).query;
Any request must not be cached.
res.setHeader("cache-control", "no-cache, no-store, must-revalidate");
res.setHeader("pragma", "no-cache");
res.setHeader("expires", "0");
streamxdr
or longpollxdr
transport requires CORS headers even in
same-origin connection.
res.setHeader("access-control-allow-origin", req.headers.origin || "*");
res.setHeader("access-control-allow-credentials", "true");
switch (req.method) {
GET
method is used to establish a channel for the server to push
something to the client and manage transports.
case "GET":
when
param indicates a specific goal of GET
request.
switch (req.params.when) {
Open a new socket establishing required transport and fires the
socket
event. transport
param is an id of transport the client uses.
case "open":
switch (req.params.transport) {
case "sse": case "streamxhr": case "streamxdr": case "streamiframe":
server.emit("socket", socket(req.params, streamTransport(req, res)));
break;
case "longpollajax": case "longpollxdr": case "longpolljsonp":
server.emit("socket", socket(req.params, longpollTransport(req, res)));
break;
If the server doesn’t support the required transport,
responds with 501 Not Implemented
.
default:
res.statusCode = 501;
res.end();
}
break;
Inject a new exchange of request and response to the long polling
transport of the socket whose id is id
param. In long polling,
a pseudo-connection consisting of disposable exchanges pretends to
be a persistent connection.
case "poll":
if (req.params.id in sockets) {
sockets[req.params.id].transport.refresh(req, res);
} else {
If there is no corresponding socket, responds with 500
Internal Server Error
.
res.statusCode = 500;
res.end();
}
break;
It means the client considers the socket whose id is id
param
as closed so abort the socket if the server couldn’t detect it.
case "abort":
if (req.params.id in sockets) {
sockets[req.params.id].close();
}
In case of browser, it is performed by script tag so set
content-type header to text/javascript
to avoid warning.
res.setHeader("content-type", "text/javascript; charset=utf-8");
res.end();
break;
If the given when
param is unsupported, responds with 501 Not
Implemented
.
default:
res.statusCode = 501;
res.end();
}
break;
POST
method is used to supply HTTP transport with message
as a channel for the client to push something to the server.
case "POST":
Reads body to retrieve message. Only text data is allowed now.
var body = "";
req.on("data", function(chunk) {
body += chunk;
});
req.on("end", function() {
Make JSON string by stripping off leading data=
.
var text = body.substring("data=".length);
Fires a message event to the socket’s transport
whose id is id
param with the JSON string.
if (req.params.id in sockets) {
sockets[req.params.id].transport.emit("message", text);
If the specified socket is not found,
responds with 500 Internal Server Error
.
} else {
res.statusCode = 500;
}
res.end();
});
break;
If the method is neither GET
nor POST
, responds with 405 Method
Not Allowed
.
default:
res.statusCode = 405;
res.end();
}
};
var webSocketFactory = new WebSocket.Server({noServer: true});
server.handleUpgrade = function(req, sock, head) {
req.params = url.parse(req.url, true).query;
webSocketFactory.handleUpgrade(req, sock, head, function(ws) {
Once a given request is upgraded to WebSocket, open a new socket using it.
server.emit("socket", socket(req.params, wsTransport(ws)));
});
};
A transport is used to establish a persistent connection, send data, receive data and close the connection and is expected to be private for user not to access.
Covers ws
.
WebSocket is a protocol designed for a full-duplex communications over a
TCP connection. However, it’s not always available for various reason.
function wsTransport(ws) {
It delegates WebSocket’s events to transport and transport’s behaviors to WebSocket.
var transport = new events.EventEmitter();
ws.onclose = function() {
transport.emit("close");
};
ws.onmessage = function(event) {
transport.emit("message", event.data);
};
transport.send = function(data) {
ws.send(data);
};
transport.close = function() {
ws.close();
};
return transport;
}
Covers sse
, streamxhr
, streamxdr
, streamiframe
.
HTTP Streaming is the way that the client performs a HTTP persistent
connection and watches changes in response text and the server prints
chunk as data to the connection.
sse
stands for Server-Sent Events
specified by W3C.
function streamTransport(req, res) {
var text2KB = Array(2048).join(" ");
var transport = new events.EventEmitter();
The content-type headers should be text/event-stream
for sse
and
text/plain
for others. Also the response should be encoded in
utf-8
format for sse
.
res.setHeader("content-type", "text/" +
(req.params.transport === "sse" ? "event-stream" : "plain") + "; charset=utf-8");
The padding is required, which makes the client-side transport be aware
of change of the response and the client-side socket fire open event.
It should be greater than 1KB, be composed of white space character and
end with \r
, \n
or \r\n
. It applies to streamxdr
, streamiframe
.
res.write(text2KB + "\n");
When either client or server closes the transport, fires a close event.
function onclose() {
if (onclose.done) {
return;
}
onclose.done = true;
transport.emit("close");
}
res.on("close", onclose);
res.on("finish", onclose);
The response text should be formatted in the event stream
format.
This is specified in sse
spec but the rest also accept that format
for convenience. According to the format, data should be broken up by
\r
, \n
, or \r\n
but because data is JSON, it’s not needed. So
prepend ‘data: ‘ and append \n\n
to the data.
transport.send = function(data) {
res.write("data: " + data + "\n\n");
};
Ends the response. Accordingly, onclose
will be executed and the close
event will be fired. Don’t do that by yourself.
transport.close = function() {
res.end();
};
return transport;
}
Covers longpollajax
, longpollxdr
, longpolljsonp
.
HTTP Long Polling is the way that the client performs a HTTP persistent
connection and the server ends the connection with data. Then, the client
receives it and performs a request again and again.
function longpollTransport(req, res) {
Current holding response.
var response;
Whether the transport is aborted or not.
var aborted;
Whether the current response has ended or not.
var ended;
Whether data is written on the current response or not.
if this is true, then ended
is also true but not vice versa.
var written;
A timer to prevent from being idle connection.
var closeTimer;
The parameters of the first request. That of subsequent requests are not used.
var params = req.params;
A queue containing events that the client couldn’t receive.
var queue = [];
var transport = new events.EventEmitter();
In long polling, an exchange of request and response is disposable so expose this method to supply with subsequent exchanges.
transport.refresh = function(req, res) {
The content-type header should be text/javascript
for longpolljsonp
and text/plain
for the others.
res.setHeader("content-type", "text/" +
(params.transport === "longpolljsonp" ? "javascript" : "plain") + "; charset=utf-8");
When either client or server closes the current exchange,
function onclose() {
if (onclose.done) {
return;
}
onclose.done = true;
The current exchange’s life ends but this has nothing to do with
written
.
ended = true;
If the request is to poll
and the server didn’t write anything,
completion of this response is the end of the transport.
Hence, fires the close event.
if (req.params.when === "poll" && !written) {
transport.emit("close");
Otherwise client will issue poll
request again so it sets
a timer to fire close event to prevent this connection from
remaining in limbo. 2s is enough.
} else {
closeTimer = setTimeout(function() {
transport.emit("close");
}, 2000);
}
}
res.on("close", onclose);
res.on("finish", onclose);
If the request is to open
, end the response. The purpose of this is
to tell the client that the server is alive. Therefore, the client
will fire the open event.
if (req.params.when === "open") {
res.end();
If the request is to poll
, remove the client-received data from queue
and flush the rest in queue if they exist.
} else {
Resets the response, flags, timers as new exchange is supplied.
response = res;
ended = written = false;
clearTimeout(closeTimer);
If aborted is true, it means the user aborted the connection but it couldn’t be done because the current response is already ended for other reason. So end the new exchange.
if (aborted) {
res.end();
return;
}
Removes client-received events from the queue. lastEventIds
param
is a comma-separated values of id of client-received events.
FYI, a.splice(b, 1)
in JavaScript is equal to a.remove(b)
.
if (req.params.lastEventIds) {
req.params.lastEventIds.split(",").forEach(function(lastEventId) {
queue.forEach(function(data) {
if (lastEventId === /"id":"([^\"]+)"/.exec(data)[1]) {
queue.splice(queue.indexOf(data), 1);
}
});
});
}
If cached data remain in the queue, it indicates the client
couldn’t receive them. So flushes them in the form of
JSON array. This is not the same with JSON.stringify(queue)
because elements in queue are already JSON string.
if (queue.length) {
transport.send("[" + queue.join(",") + "]", true);
}
}
};
Refreshes with the first exchange.
transport.refresh(req, res);
transport.send = function(data, fromQueue) {
If data is not from the queue, caches it.
if (!fromQueue) {
queue.push(data);
}
Only when the current response is not ended it’s possible to send.
If it is ended, the cached data will be sent in next poll through
refresh
method.
if (!ended) {
Flags the current response is written.
written = true;
In case of longpolljsonp
, the response text is supposed to be a
JavaScript code executing a callback with data. The callback name
is passed as the first request’s callback
param and the data to
be returned have to be escaped to a JavaScript string literal.
For others, no formatting is needed. All the long polling
transports has to finish the request after processing. The
onclose
will be executed after this.
response.end(params.transport === "longpolljsonp" ?
params.callback + "(" + JSON.stringify(data) + ");" : data);
}
};
transport.close = function() {
Marks the transport is aborted.
aborted = true;
Ends response if possible. If it’s not possible, a next poll request
will be ended immediately by aborted
flag so it will fire the close
event. So you don’t need to manually dispatch the close event here.
if (!ended) {
response.end();
}
};
return transport;
}
A socket is an interface to exchange event between the two endpoints and expected to be public for developers to create vibe application. The event is serialized to and deseriazlied from JSON specified in ECMA-404.
function socket(params, transport) {
var socket = new events.EventEmitter();
Assigns an id that is UUID generated by client.
socket.id = params.id;
I don’t recommend to expose transport but it’s needed here for HTTP transports.
socket.transport = transport;
On the transport’s close event, removes the socket from the repository to make it have only opened sockets and fires the close event.
transport.on("close", function() {
delete sockets[socket.id];
socket.emit("close");
});
transport.on("message", function(text) {
Converts JSON to an event object.
It should have the following properties:
id: string
: an event identifier.type: string
: an event type.data: any
: an event data.If the server implements reply
extension, the following
properties should be considered as well.
reply: boolean
: true if this event requires the reply. var event = JSON.parse(text);
If the client sends a plain event, dispatch it.
if (!event.reply) {
socket.emit(event.type, event.data);
This is how to implement reply
extension. An event handler
for the corresponding event will receive reply controller
as 2nd argument. It calls the client’s resolved or rejected
callback by sending reply
event.
} else {
The latch prevents double reply.
var latch;
socket.emit(event.type, event.data, {
resolve: function(value) {
if (!latch) {
latch = true;
socket.send("reply", {id: event.id, data: value, exception: false});
}
},
reject: function(reason) {
if (!latch) {
latch = true;
socket.send("reply", {id: event.id, data: reason, exception: true});
}
}
});
}
});
An auto-increment id for event. In case of long polling, these ids are
echoed back as a query string to the URL in GET. To avoid 414
Request-URI Too Long
error, though it is not that important, it
would be better to use small sized id.
var eventId = 0;
A map for reply callbacks for reply
extension.
var callbacks = {};
socket.send = function(type, data, resolved, rejected) {
It should have the following properties:
id: string
: an event identifier.type: string
: an event type.data: any
: an event data.If the server implements reply
extension, the following
properties should be available as well.
reply: boolean
: true if this event requires the reply. var event = {
id: "" + eventId++,
type: type,
data: data,
reply: resolved != null || rejected != null
};
For reply
extension, stores resolved and rejected callbacks if
they are given.
if (event.reply) {
callbacks[event.id] = {resolved: resolved, rejected: rejected};
}
Convert the event to JSON and sends it through the transport.
transport.send(JSON.stringify(event));
};
For reply
extension, on the reply event, executes the stored reply
callbacks with data.
socket.on("reply", function(reply) {
if (reply.id in callbacks) {
var cbs = callbacks[reply.id];
var fn = reply.exception ? cbs.rejected : cbs.resolved;
if (fn) {
fn.call(this, reply.data);
}
delete callbacks[reply.id];
}
});
socket.close = function() {
transport.close();
};
If heartbeat
param is not false
and is a number, prepares
the heartbeat handshakes. FYI +"false"
gives NaN
equal to false
and +"5000"
gives 5000
equal to true
in JavaScript.
if (+params.heartbeat) {
Sets a timer to close the socket after the heartbeat interval.
var heartbeatTimer;
function setHeartbeatTimer() {
heartbeatTimer = setTimeout(function() {
socket.close();
}, +params.heartbeat);
}
setHeartbeatTimer();
The client will start to heartbeat on its open event and send the heartbaet event periodically. Then, cancels the timer, sets it up again and sends the heartbeat event as a response.
socket.on("heartbeat", function() {
clearTimeout(heartbeatTimer);
setHeartbeatTimer();
socket.send("heartbeat");
})
To prevent a side effect of the timer, clears it on the close event.
.on("close", function() {
clearTimeout(heartbeatTimer);
});
}
Finally registers the newly created socket to the repository,
sockets
, by id.
sockets[socket.id] = socket;
return socket;
}
return server;
}