• Jump To … +
    client.js index.js server.js
  • server.js

  • ¶
    /*
     * Vibe Server
     * http://vibe-project.github.io/projects/vibe-protocol/
     * 
     * Copyright 2014 The Vibe Project 
     * Licensed under the Apache License, Version 2.0
     * http://www.apache.org/licenses/LICENSE-2.0
     */
    var events     = require("events"); 
    var url        = require("url"); 
    var uuid       = require("node-uuid");
    var WebSocket  = require("ws");
  • ¶

    This module is exposed to the parent module’s server as a constructor of server.

    module.exports = server;
  • ¶

    A server instance to be returned by this function is expected to consume exchange of HTTP request and response and WebSocket and produce socket. HTTP protocol and WebSocket protocol are standardized in RFC 2616 and RFC 6455, respectively.

    function server() {
        var server = new events.EventEmitter();
  • ¶

    These options are to negotiate the protocol. A client will connect to the server following them.

        var options = {
  • ¶

    A set of supported transports to be used by a client.

            transports: ["ws", "sse", "streamxhr", "streamxdr", "streamiframe", "longpollajax", "longpollxdr", "longpolljsonp"],
  • ¶

    A heartbeat interval in milliseconds.

            heartbeat: 20000,
  • ¶

    This is just to speed up heartbeat test and not required generally. It means the time to wait for the server’s response. The default value is 5000.

            _heartbeat: 5000
        };
  • ¶

    Setter for options

        server.setTransports = function(transports) {
            options.transports = transports;
        };
        server.setHeartbeat = function(heartbeat) {
            options.heartbeat = heartbeat;
        };
        server.set_heartbeat = function(_heartbeat) {
            options._heartbeat = _heartbeat
        };
  • ¶

    A container for opened sockets.

        var sockets = {};
        server.handleRequest = function(req, res) {
            req.params = url.parse(req.url, true).query;
  • ¶

    Any request must not be cached.

            res.setHeader("cache-control", "no-cache, no-store, must-revalidate");
            res.setHeader("pragma", "no-cache");
            res.setHeader("expires", "0");
  • ¶

    streamxdr or longpollxdr transport requires CORS headers even in same-origin connection.

            res.setHeader("access-control-allow-origin", req.headers.origin || "*");
            res.setHeader("access-control-allow-credentials", "true");
            switch (req.method) {
  • ¶

    GET method is used to establish a channel for the server to push something to the client and manage transports.

            case "GET":
  • ¶

    when param indicates a specific goal of GET request.

                switch (req.params.when) {
  • ¶

    Negotiates the protocol. Information to connect to the server are passed to the client.

                case "handshake":
  • ¶

    A result of handshaking is a JSON containing that information.

                    var result = JSON.stringify({
  • ¶

    An identifier for a socket the client will establish. It should be universally unique.

                        id: uuid.v4(), 
                        transports: options.transports, 
                        heartbeat: options.heartbeat,
                        _heartbeat: options._heartbeat
                    });
  • ¶

    An old client like browsers not implementing CORS may have to use JSONP because this request would be cross origin. If that is the case, callback parameter will be passed for JSONP.

                    if (req.params.callback) {
                        result = req.params.callback + "(" + JSON.stringify(result) + ");";
                    }
                    res.end(result);
                    break;
  • ¶

    Open a new socket establishing required transport and fires the socket event. transport param is an id of transport the client uses.

                case "open":
                    var transport = transports[req.params.transport];
                    if (transport) {
                        var s = socket(req.params, transport(req, res));
                        s.uri = req.url;
                        server.emit("socket", s);
  • ¶

    If the server doesn’t support the required transport, responds with 501 Not Implemented. However, it’s unlikely to happen.

                    } else {
                        res.statusCode = 501;
                        res.end();
                    }
                    break;
  • ¶

    Inject a new exchange of request and response to the long polling transport of the socket whose id is id param. In long polling, a pseudo-connection consisting of disposable exchanges pretends to be a persistent connection.

                case "poll":
                    if (req.params.id in sockets) {
                        sockets[req.params.id].transport.refresh(req, res);
                    } else {
  • ¶

    If there is no corresponding socket, responds with 500 Internal Server Error.

                        res.statusCode = 500;
                        res.end();
                    }
                    break;
  • ¶

    It means the client considers the socket whose id is id param as closed so abort the socket if the server couldn’t detect it for some reason.

                case "abort": 
                    if (req.params.id in sockets) {
                        sockets[req.params.id].close();
                    }
  • ¶

    In case of browser, it is performed by script tag so set content-type header to text/javascript to avoid warning. It’s just a warning and not serious.

                    res.setHeader("content-type", "text/javascript; charset=utf-8");
                    res.end();
                    break;
  • ¶

    If the given when param is unsupported, responds with 501 Not Implemented.

                default:
                    res.statusCode = 501;
                    res.end();
                }
                break;
  • ¶

    POST method is used to supply HTTP transport with message as a channel for the client to push something to the server.

            case "POST":
  • ¶

    Reads body to retrieve message. Only text data is allowed now.

                var body = "";
                req.on("data", function(chunk) {
                    body += chunk;
                });
                req.on("end", function() {
  • ¶

    Retrieve text message by stripping off leading data=.

                    var text = body.substring("data=".length);
  • ¶

    Fires a message event to the socket’s transport whose id is id param with that text message.

                    if (req.params.id in sockets) {
                        sockets[req.params.id].transport.emit("message", text);
  • ¶

    If the specified socket is not found, responds with 500 Internal Server Error.

                    } else {
                        res.statusCode = 500;
                    }
                    res.end();
                });
                break;
  • ¶

    If the method is neither GET nor POST, responds with 405 Method Not Allowed.

            default:
                res.statusCode = 405;
                res.end();
            }
        };
  • ¶

    An HTTP upgrade is used for WebSocket transport.

        var webSocketFactory = new WebSocket.Server({noServer: true});
        server.handleUpgrade = function(req, sock, head) {
            req.params = url.parse(req.url, true).query;
            webSocketFactory.handleUpgrade(req, sock, head, function(ws) {
  • ¶

    Once a given request is upgraded to WebSocket, open a new socket using it.

                var s = socket(req.params, transports.ws(ws));
                s.uri = req.url;
                server.emit("socket", s);
            });
        };
  • ¶

    A socket is an interface to exchange event between the two endpoints and expected to be public for developers to create vibe application. The event is serialized to and deseriazlied from JSON specified in ECMA-404.

        function socket(params, transport) {
            var socket = new events.EventEmitter();
  • ¶

    Assigns an UUID as an identifier of this socket.

            socket.id = params.id;
  • ¶

    I don’t recommend to expose transport but it’s needed here for HTTP transports.

            socket.transport = transport;
  • ¶

    Propagates the transport’s errors to socket

            transport.on("error", function(error) {
                socket.emit("error", error);
            });
  • ¶

    On the transport’s close event, removes the socket from the repository to make it have only opened sockets and fires the close event.

            transport.on("close", function() {
                delete sockets[socket.id];
                socket.emit("close");
            });
  • ¶

    When the underlying transport has received a message from the client.

            transport.on("message", function(text) {
  • ¶

    Converts JSON to an event object.

    It should have the following properties:

    • id: string: an event identifier.
    • type: string: an event type.
    • data: any: an event data.

    If the server implements reply extension, the following properties should be considered as well.

    • reply: boolean: true if this event requires the reply.
                var event = JSON.parse(text);
  • ¶

    If the client sends a plain event, dispatch it.

                if (!event.reply) {
                    socket.emit(event.type, event.data);
  • ¶

    This is how to implement reply extension. An event handler for the corresponding event will receive reply controller as 2nd argument. It calls the client’s resolved or rejected callback by sending reply event.

                } else {
  • ¶

    The latch prevents double reply.

                    var latch;
                    socket.emit(event.type, event.data, {
                        resolve: function(value) {
                            if (!latch) {
                                latch = true;
                                socket.send("reply", {id: event.id, data: value, exception: false});
                            }
                        },
                        reject: function(reason) {
                            if (!latch) {
                                latch = true;
                                socket.send("reply", {id: event.id, data: reason, exception: true});
                            }
                        }
                    });
                }
            });
  • ¶

    An auto-increment id for event. In case of long polling, these ids are echoed back as a query string to the URL in GET. To avoid 414 Request-URI Too Long error, though it is not that important, it would be better to use small sized id. Moreover, it should be unique among events to be sent to the client and has nothing to do with one the client sent.

            var eventId = 0;
  • ¶

    A map for reply callbacks for reply extension.

            var callbacks = {};
            socket.send = function(type, data, resolved, rejected) {
  • ¶

    It should have the following properties:

    • id: string: an event identifier.
    • type: string: an event type.
    • data: any: an event data.

    If the server implements reply extension, the following properties should be available as well.

    • reply: boolean: true if this event requires the reply.
                var event = {
                    id: "" + eventId++, 
                    type: type, 
                    data: data, 
                    reply: resolved != null || rejected != null
                };
  • ¶

    For reply extension, stores resolved and rejected callbacks if they are given.

                if (event.reply) {
                    callbacks[event.id] = {resolved: resolved, rejected: rejected};
                }
  • ¶

    Converts the event to JSON and sends it through the transport.

                transport.send(JSON.stringify(event));
            };
  • ¶

    For reply extension, on the reply event, executes the stored reply callbacks with data.

            socket.on("reply", function(reply) {
                if (reply.id in callbacks) {
                    var cbs = callbacks[reply.id];
                    var fn = reply.exception ? cbs.rejected : cbs.resolved;
                    if (fn) {
                        fn.call(this, reply.data);
                    }
                    delete callbacks[reply.id];
                }
            });
  • ¶

    By closing the transport.

            socket.close = function() {
                transport.close();
            };
  • ¶

    Sets a timer to close the socket after the heartbeat interval.

            var heartbeatTimer;
            function setHeartbeatTimer() {
                heartbeatTimer = setTimeout(function() {
                    socket.emit("error", new Error("heartbeat"));
                    socket.close();
                }, options.heartbeat);
            }
            setHeartbeatTimer();
  • ¶

    The client will start to heartbeat on its open event and send the heartbaet event periodically. Then, cancels the timer, sets it up again and sends the heartbeat event as a response.

            socket.on("heartbeat", function() {
                clearTimeout(heartbeatTimer);
                setHeartbeatTimer();
                socket.send("heartbeat");
            })
  • ¶

    To prevent a side effect of the timer, clears it on the close event.

            .on("close", function() {
                clearTimeout(heartbeatTimer);
            });
  • ¶

    Finally registers the newly created socket to the repository, sockets, by id.

            sockets[socket.id] = socket;
            return socket;
        }
        
        return server;
    }
  • ¶

    A transport is used to establish a persistent connection, send data, receive data and close the connection and is expected to be private for user not to access.

    var transports = {};
  • ¶

    WebSocket is a protocol designed for a full-duplex communications over a TCP connection. However, it’s not always available for various reason.

    transports.ws = function(ws) {
  • ¶

    It delegates WebSocket’s events to transport and transport’s behaviors to WebSocket.

        var transport = new events.EventEmitter();
        ws.onerror = function(error) {
            transport.emit("error", error);
        };
        ws.onclose = function() {
            transport.emit("close");
        };
        ws.onmessage = function(event) {
            transport.emit("message", event.data);
        };
        transport.send = function(data) {
            ws.send(data);
        };
        transport.close = function() {
            ws.close();
        };
        return transport;
    };
  • ¶

    HTTP Streaming is the way that the client performs a HTTP persistent connection and watches changes in response text and the server prints chunk as data to the connection.

    sse stands for Server-Sent Events specified by W3C.

    transports.sse = function(req, res) {
        var text2KB = Array(2048).join(" ");
        var transport = new events.EventEmitter();
  • ¶

    Propagates HTTP exchange’s errors to transport

        req.on("error", function(error) {
            transport.emit("error", error);
        });
        res.on("error", function(error) {
            transport.emit("error", error);
        });
  • ¶

    The content-type headers should be text/event-stream for sse and text/plain for others. Also the response should be encoded in utf-8 format for sse.

        res.setHeader("content-type", "text/" + 
            (req.params.transport === "sse" ? "event-stream" : "plain") + "; charset=utf-8");
  • ¶

    The padding is required, which makes the client-side transport be aware of change of the response and the client-side socket fire open event. It should be greater than 1KB, be composed of white space character and end with \r, \n or \r\n. It applies to streamxdr, streamiframe.

        res.write(text2KB + "\n");
  • ¶

    When client aborts the underlying connection or server completes the response.

        function onclose() {
            if (onclose.done) {
                return;
            }
            onclose.done = true;
            transport.emit("close");
        }
        res.on("close", onclose);
        res.on("finish", onclose);
  • ¶

    The response text should be formatted in the event stream format. This is specified in sse spec but the rest also accept that format for convenience. According to the format, data should be broken up by \r, \n, or \r\n but because data is JSON, it’s not needed. So prepend ‘data: ‘ and append \n\n to the data.

        transport.send = function(data) {
            res.write("data: " + data + "\n\n");
        };
  • ¶

    Ends the response. Accordingly, onclose will be executed and the finish event will be fired. Don’t do that by yourself.

        transport.close = function() {
            res.end();
        };
        return transport;
    };
  • ¶

    In server, there is no much difference between them.

    transports.streamxhr = transports.streamxdr = transports.streamiframe = transports.sse;
  • ¶

    HTTP Long Polling is the way that the client performs a HTTP persistent connection and the server ends the connection with data. Then, the client receives it and performs a request again and again.

    transports.longpollajax = function(req, res) {
  • ¶

    Current holding response.

        var response;
  • ¶

    Whether the transport is aborted or not.

        var aborted;
  • ¶

    Whether the current response is closed or not.

        var closed;
  • ¶

    Whether data is written on the current response or not. if this is true, then closed is also true but not vice versa.

        var written;
  • ¶

    A timer to prevent from being idle connection.

        var closeTimer;
  • ¶

    The parameters of the first request. That of subsequent requests are not used.

        var params = req.params;
  • ¶

    A queue containing events that the client couldn’t receive.

        var queue = [];
        var transport = new events.EventEmitter();
  • ¶

    In long polling, an exchange of request and response is disposable so expose this method to supply with subsequent exchanges.

        transport.refresh = function(req, res) {
  • ¶

    Propagates HTTP exchange’s errors to transport

            req.on("error", function(error) {
                transport.emit("error", error);
            });
            res.on("error", function(error) {
                transport.emit("error", error);
            });
  • ¶

    The content-type header should be text/javascript for longpolljsonp and text/plain for the others.

            res.setHeader("content-type", "text/" + 
                (params.transport === "longpolljsonp" ? "javascript" : "plain") + "; charset=utf-8");
  • ¶

    When client aborts the underlying connection or server completes the response.

            function onclose() {
                if (onclose.done) {
                    return;
                }
                onclose.done = true;
  • ¶

    The current exchange’s life ends but this has nothing to do with written.

                closed = true;
  • ¶

    If the request is to poll and the server didn’t write anything, completion of this response is the end of the transport. Hence, fires the close event.

                if (req.params.when === "poll" && !written) {
                    transport.emit("close");
  • ¶

    Otherwise client will issue poll request again so it sets a timer to fire close event to prevent this connection from remaining in limbo. 2s is enough.

                } else {
                    closeTimer = setTimeout(function() {
                        transport.emit("close");
                    }, 2000);
                }
            }
            res.on("close", onclose);
            res.on("finish", onclose);
  • ¶

    If the request is to open, end the response. The purpose of this is to tell the client that the server is alive. Therefore, the client will fire the open event.

            if (req.params.when === "open") {
                res.end();
  • ¶

    If the request is to poll, remove the client-received events from queue and flush the rest in queue if they exist.

            } else {
  • ¶

    Resets the response, flags, timers as new exchange is supplied.

                response = res;
                closed = written = false;
                clearTimeout(closeTimer);
  • ¶

    If aborted is true here, it means the user aborted the connection but it couldn’t be done because the current response is already closed for other reason. So ends the new exchange.

                if (aborted) {
                    res.end();
                    return;
                }
  • ¶

    Removes client-received events from the queue. lastEventIds param is comma-separated values of id of client-received events. FYI, a.splice(b, 1) in JavaScript means removing b from a.

                if (req.params.lastEventIds) {
                    req.params.lastEventIds.split(",").forEach(function(lastEventId) {
                        queue.forEach(function(data) {
                            if (lastEventId === /"id":"([^\"]+)"/.exec(data)[1]) {
                                queue.splice(queue.indexOf(data), 1);
                            }
                        });
                    });
                }
  • ¶

    If cached events remain in the queue, it indicates the client couldn’t receive them. So flushes them in the form of JSON array. This is not the same with JSON.stringify(queue) because elements in queue are already JSON string.

                if (queue.length) {
                    transport.send("[" + queue.join(",") + "]", true);
                }
            }
        };
  • ¶

    Refreshes with the first exchange.

        transport.refresh(req, res);
        transport.send = function(data, fromQueue) {
  • ¶

    If data is not from the queue, caches it.

            if (!fromQueue) {
                queue.push(data);
            }
  • ¶

    Only when the current response is not closed, it’s possible to send. If it is closed, the cached data will be sent in next poll through refresh method.

            if (!closed) {
  • ¶

    Flags the current response is written.

                written = true;
  • ¶

    In case of longpolljsonp, the response text is supposed to be a JavaScript code executing a callback with data. The callback name is passed as the first request’s callback param and the data to be returned have to be escaped to a JavaScript string literal. For others, no formatting is needed. All the long polling transports has to finish the request after processing. The onclose will be executed after this.

                response.end(params.transport === "longpolljsonp" ? 
                    params.callback + "(" + JSON.stringify(data) + ");" : data);
            }
        };
        transport.close = function() {
  • ¶

    Marks the transport is aborted.

            aborted = true;
  • ¶

    Ends response if possible. If it’s not possible, a next poll request will be ended immediately by aborted flag so it will fire the finish event. So you don’t need to manually dispatch the close event here.

            if (!closed) {
                response.end();
            }
        };        
        return transport;
    };
  • ¶

    In server, there is no much difference between them.

    transports.longpollxdr = transports.longpolljsonp = transports.longpollajax;