270 lines
5.4 KiB
JavaScript
270 lines
5.4 KiB
JavaScript
|
|
/*!
|
|
* socket.io-node
|
|
* Copyright(c) 2011 LearnBoost <dev@learnboost.com>
|
|
* MIT Licensed
|
|
*/
|
|
|
|
/**
|
|
* Module dependencies.
|
|
*/
|
|
|
|
var crypto = require('crypto')
|
|
, Store = require('../store')
|
|
, assert = require('assert');
|
|
|
|
/**
|
|
* Exports the constructor.
|
|
*/
|
|
|
|
exports = module.exports = Redis;
|
|
Redis.Client = Client;
|
|
|
|
/**
|
|
* Redis store.
|
|
* Options:
|
|
* - nodeId (fn) gets an id that uniquely identifies this node
|
|
* - redis (fn) redis constructor, defaults to redis
|
|
* - redisPub (object) options to pass to the pub redis client
|
|
* - redisSub (object) options to pass to the sub redis client
|
|
* - redisClient (object) options to pass to the general redis client
|
|
* - pack (fn) custom packing, defaults to JSON or msgpack if installed
|
|
* - unpack (fn) custom packing, defaults to JSON or msgpack if installed
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
function Redis (opts) {
|
|
opts = opts || {};
|
|
|
|
// node id to uniquely identify this node
|
|
var nodeId = opts.nodeId || function () {
|
|
// by default, we generate a random id
|
|
return Math.abs(Math.random() * Math.random() * Date.now() | 0);
|
|
};
|
|
|
|
this.nodeId = nodeId();
|
|
|
|
// packing / unpacking mechanism
|
|
if (opts.pack) {
|
|
this.pack = opts.pack;
|
|
this.unpack = opts.unpack;
|
|
} else {
|
|
try {
|
|
var msgpack = require('msgpack');
|
|
this.pack = msgpack.pack;
|
|
this.unpack = msgpack.unpack;
|
|
} catch (e) {
|
|
this.pack = JSON.stringify;
|
|
this.unpack = JSON.parse;
|
|
}
|
|
}
|
|
|
|
var redis = opts.redis || require('redis')
|
|
, RedisClient = redis.RedisClient;
|
|
|
|
// initialize a pubsub client and a regular client
|
|
if (opts.redisPub instanceof RedisClient) {
|
|
this.pub = opts.redisPub;
|
|
} else {
|
|
opts.redisPub || (opts.redisPub = {});
|
|
this.pub = redis.createClient(opts.redisPub.port, opts.redisPub.host, opts.redisPub);
|
|
}
|
|
if (opts.redisSub instanceof RedisClient) {
|
|
this.sub = opts.redisSub;
|
|
} else {
|
|
opts.redisSub || (opts.redisSub = {});
|
|
this.sub = redis.createClient(opts.redisSub.port, opts.redisSub.host, opts.redisSub);
|
|
}
|
|
if (opts.redisClient instanceof RedisClient) {
|
|
this.cmd = opts.redisClient;
|
|
} else {
|
|
opts.redisClient || (opts.redisClient = {});
|
|
this.cmd = redis.createClient(opts.redisClient.port, opts.redisClient.host, opts.redisClient);
|
|
}
|
|
|
|
Store.call(this, opts);
|
|
|
|
this.sub.setMaxListeners(0);
|
|
this.setMaxListeners(0);
|
|
};
|
|
|
|
/**
|
|
* Inherits from Store.
|
|
*/
|
|
|
|
Redis.prototype.__proto__ = Store.prototype;
|
|
|
|
/**
|
|
* Publishes a message.
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Redis.prototype.publish = function (name) {
|
|
var args = Array.prototype.slice.call(arguments, 1);
|
|
this.pub.publish(name, this.pack({ nodeId: this.nodeId, args: args }));
|
|
this.emit.apply(this, ['publish', name].concat(args));
|
|
};
|
|
|
|
/**
|
|
* Subscribes to a channel
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Redis.prototype.subscribe = function (name, consumer, fn) {
|
|
this.sub.subscribe(name);
|
|
|
|
if (consumer || fn) {
|
|
var self = this;
|
|
|
|
self.sub.on('subscribe', function subscribe (ch) {
|
|
if (name == ch) {
|
|
function message (ch, msg) {
|
|
if (name == ch) {
|
|
msg = self.unpack(msg);
|
|
|
|
// we check that the message consumed wasnt emitted by this node
|
|
if (self.nodeId != msg.nodeId) {
|
|
consumer.apply(null, msg.args);
|
|
}
|
|
}
|
|
};
|
|
|
|
self.sub.on('message', message);
|
|
|
|
self.on('unsubscribe', function unsubscribe (ch) {
|
|
if (name == ch) {
|
|
self.sub.removeListener('message', message);
|
|
self.removeListener('unsubscribe', unsubscribe);
|
|
}
|
|
});
|
|
|
|
self.sub.removeListener('subscribe', subscribe);
|
|
|
|
fn && fn();
|
|
}
|
|
});
|
|
}
|
|
|
|
this.emit('subscribe', name, consumer, fn);
|
|
};
|
|
|
|
/**
|
|
* Unsubscribes
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Redis.prototype.unsubscribe = function (name, fn) {
|
|
this.sub.unsubscribe(name);
|
|
|
|
if (fn) {
|
|
var client = this.sub;
|
|
|
|
client.on('unsubscribe', function unsubscribe (ch) {
|
|
if (name == ch) {
|
|
fn();
|
|
client.removeListener('unsubscribe', unsubscribe);
|
|
}
|
|
});
|
|
}
|
|
|
|
this.emit('unsubscribe', name, fn);
|
|
};
|
|
|
|
/**
|
|
* Destroys the store
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
Redis.prototype.destroy = function () {
|
|
Store.prototype.destroy.call(this);
|
|
|
|
this.pub.end();
|
|
this.sub.end();
|
|
this.cmd.end();
|
|
};
|
|
|
|
/**
|
|
* Client constructor
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
function Client (store, id) {
|
|
Store.Client.call(this, store, id);
|
|
};
|
|
|
|
/**
|
|
* Inherits from Store.Client
|
|
*/
|
|
|
|
Client.prototype.__proto__ = Store.Client;
|
|
|
|
/**
|
|
* Redis hash get
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Client.prototype.get = function (key, fn) {
|
|
this.store.cmd.hget(this.id, key, fn);
|
|
return this;
|
|
};
|
|
|
|
/**
|
|
* Redis hash set
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Client.prototype.set = function (key, value, fn) {
|
|
this.store.cmd.hset(this.id, key, value, fn);
|
|
return this;
|
|
};
|
|
|
|
/**
|
|
* Redis hash del
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Client.prototype.del = function (key, fn) {
|
|
this.store.cmd.hdel(this.id, key, fn);
|
|
return this;
|
|
};
|
|
|
|
/**
|
|
* Redis hash has
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Client.prototype.has = function (key, fn) {
|
|
this.store.cmd.hexists(this.id, key, function (err, has) {
|
|
if (err) return fn(err);
|
|
fn(null, !!has);
|
|
});
|
|
return this;
|
|
};
|
|
|
|
/**
|
|
* Destroys client
|
|
*
|
|
* @param {Number} number of seconds to expire data
|
|
* @api private
|
|
*/
|
|
|
|
Client.prototype.destroy = function (expiration) {
|
|
if ('number' != typeof expiration) {
|
|
this.store.cmd.del(this.id);
|
|
} else {
|
|
this.store.cmd.expire(this.id, expiration);
|
|
}
|
|
|
|
return this;
|
|
};
|