Persistent users, tokens & servers using Redis (fixes #9 & #10).

This commit is contained in:
Julien Fontanet 2013-09-11 12:44:45 +02:00
parent 937345f0e0
commit ac69349822
8 changed files with 584 additions and 221 deletions

View File

@ -2,7 +2,6 @@
"bitwise": true,
"curly": true,
"eqeqeq": true,
"es5": true,
"latedef": true,
"laxbreak": true,
"maxcomplexity": 10,

View File

@ -1,28 +1,20 @@
# Configuration of XO-Server's web server.
http:
# Address on which the server is listening on.
#
# Sets it to '*' to listen on all addresses.
#
# Default: 127.0.0.1
#host: '*'
# Port on which the server is listening on.
#
# Default: 80
#port: 8080
# Address on which the server is listening on.
# Configuration of the Redis server.
redis:
# Syntax: tcp://[db[:password]@]hostname[:port]
#
# Sets it to '*' to listen on all addresses.
#
# Default: localhost
#host: '*'
# For now, XO-Server does not have any persistence, therefore
# users should be recreated each time.
#
# So, temporarily, users can be created here.
users:
- email: 'admin@admin.com'
password: 'admin'
permission: 'admin'
# Same thing for servers.
servers:
- host: ''
username: ''
password: ''
# Default: tcp://localhost:6379
#uri: ''

View File

@ -3,19 +3,10 @@ var Q = require('q');
//////////////////////////////////////////////////////////////////////
function Collection(models)
function Collection()
{
// Parent constructor.
Collection.super_.call(this);
this.models = {};
this.next_id = 0;
if (models)
{
this.add(models);
}
}
require('util').inherits(Collection, require('events').EventEmitter);
@ -32,9 +23,6 @@ Collection.prototype.add = function (models, options) {
array = false;
}
// @todo Temporary mesure, implement “set()” instead.
var replace = !!(options && options.replace);
for (var i = 0, n = models.length; i < n; ++i)
{
var model = models[i];
@ -51,45 +39,18 @@ Collection.prototype.add = function (models, options) {
throw error;
}
var id = model.get('id');
if (undefined === id)
{
id = this.next_id++;
model.set('id', id);
}
// Existing models are ignored.
if (!replace && this.models[id])
{
return Q.reject('cannot add existing models!');
}
this.models[id] = models[i] = model.properties;
models[i] = model.properties;
}
this.emit('add', models);
var self = this;
return Q.when(this._add(models, options), function (models) {
self.emit('add', models);
/* jshint newcap: false */
return Q(array ? models : models[0]);
};
/**
*
*/
Collection.prototype.count = function (properties) {
return this.get(properties).then(function (models) {
return models.length;
});
};
/**
*
*/
Collection.prototype.exists = function (properties) {
return this.first(properties).then(function (model) {
return (null !== model);
if (!array)
{
return models[0];
}
return models;
});
};
@ -97,50 +58,42 @@ Collection.prototype.exists = function (properties) {
*
*/
Collection.prototype.first = function (properties) {
/* jshint newcap:false */
var model;
if (_.isObject(properties))
if (!_.isObject(properties))
{
model = _.findWhere(this.models, properties);
}
else
{
// Research by id.
model = this.models[properties];
properties = (undefined !== properties)
? { 'id': properties }
: {}
;
}
if (!model)
{
return Q(null);
}
var self = this;
return Q.when(this._first(properties), function (model) {
if (!model)
{
return null;
}
return Q(new this.model(model));
return new self.model(model);
});
};
/**
* Find all models which have a given set of properties.
*
* /!\: Does not return instance of this.model.
* /!\: Does not return instances of this.model.
*/
Collection.prototype.get = function (properties) {
/* jshint newcap: false */
// For coherence with other methods.
if ((undefined !== properties) && !_.isObject(properties))
if (!_.isObject(properties))
{
properties = {
'id': properties,
};
properties = (undefined !== properties)
? { 'id': properties }
: {}
;
}
if (_.isEmpty(properties))
{
return Q(_.values(this.models));
}
return Q(_.where(this.models, properties));
/* jshint newcap: false */
return Q(this._get(properties));
};
@ -153,15 +106,11 @@ Collection.prototype.remove = function (ids) {
ids = [ids];
}
_.each(ids, function (id) {
delete this.models[id];
}, this);
this.emit('remove', ids);
// @todo Maybe return a more meaningful value.
/* jshint newcap: false */
return Q(true); // @todo Returns false if it fails.
var self = this;
return Q.when(this._remove(ids), function () {
self.emit('remove', ids);
return true;
});
};
/**
@ -186,35 +135,116 @@ Collection.prototype.update = function (models) {
array = false;
}
// @todo Rewrite.
for (var i = 0; i < models.length; i++)
for (var i = 0, n = models.length; i < n; i++)
{
var model = models[i];
if (model instanceof this.model)
if ( !(model instanceof this.model) )
{
model = model.properties;
// @todo Problems, we may be mixing in some default
// properties which will overwrite existing ones.
model = new this.model(model);
}
var id = model.id;
var id = model.get('id');
// Missing models should be added not updated.
if (!this.models[id])
if (!id)
{
return Q.reject('missing model');
return Q.reject('a model without an id cannot be updated');
}
// @todo Model validation.
var error = model.validate();
if (undefined !== error)
{
// @todo Better system inspired by Backbone.js.
throw error;
}
// @todo Event handling.
_.extend(this.models[id], model);
models[i] = model.properties;
}
/* jshint newcap: false */
return Q(array ? models : models[0]);
var self = this;
return Q.when(this._update(models), function (models) {
self.emit('update', models);
if (!array)
{
return models[0];
}
return models;
});
};
Collection.extend = require('extendable');
//Collection.extend = require('extendable');
//////////////////////////////////////////////////////////////////////
// Methods to override in implentations.
//////////////////////////////////////////////////////////////////////
/**
*
*/
Collection.prototype._add = function (models, options) {
throw 'not implemented';
};
/**
*
*/
Collection.prototype._get = function (properties) {
throw 'not implemented';
};
/**
*
*/
Collection.prototype._remove = function (ids) {
throw 'not implemented';
};
/**
*
*/
Collection.prototype._update = function (models) {
throw 'not implemented';
};
//////////////////////////////////////////////////////////////////////
// Methods which may be overriden in implentations.
//////////////////////////////////////////////////////////////////////
/**
*
*/
Collection.prototype.count = function (properties) {
return this.get(properties).then(function (models) {
return models.length;
});
};
/**
*
*/
Collection.prototype.exists = function (properties) {
return this.first(properties).then(function (model) {
return (null !== model);
});
};
/**
*
*/
Collection.prototype._first = function (properties) {
return Q.when(this.get(properties), function (models) {
if (0 === models.length)
{
return null;
}
return models[0];
});
};
//////////////////////////////////////////////////////////////////////

99
src/collection/memory.js Normal file
View File

@ -0,0 +1,99 @@
var _ = require('underscore');
var Q = require('q');
//////////////////////////////////////////////////////////////////////
function Memory(models)
{
Memory.super_.call(this);
this.models = {};
this.next_id = 0;
if (models)
{
this.add(models);
}
}
require('util').inherits(Memory, require('../collection'));
Memory.prototype._add = function (models, options) {
// @todo Temporary mesure, implement “set()” instead.
var replace = !!(options && options.replace);
for (var i = 0, n = models.length; i < n; ++i)
{
var model = models[i];
var id = model.id;
if (undefined === id)
{
model.id = id = ''+ this.next_id++;
}
else if (!replace && this.models[id])
{
// Existing models are ignored.
return Q.reject('cannot add existing models!');
}
this.models[id] = model;
}
return models;
};
Memory.prototype._first = function (properties) {
if (_.isEmpty(properties))
{
// Return the first model if any.
for (var id in this.models)
{
return this.models[id];
}
return null;
}
return _.findWhere(this.models, properties);
};
Memory.prototype._get = function (properties) {
if (_.isEmpty(properties))
{
return _.values(this.models);
}
return _.where(this.model, properties);
};
Memory.prototype._remove = function (ids) {
for (var i = 0, n = ids.length; i < n; ++i)
{
delete this.models[ids[i]];
}
};
Memory.prototype._update = function (models) {
for (var i = 0, n = models.length; i < n; i++)
{
var model = models[i];
var id = model.id;
// Missing models should be added not updated.
if (!this.models[id])
{
return Q.reject('missing model');
}
_.extend(this.models[id], model);
}
return models;
};
//////////////////////////////////////////////////////////////////////
Memory.extend = require('extendable');
module.exports = Memory;

231
src/collection/redis.js Normal file
View File

@ -0,0 +1,231 @@
var _ = require('underscore');
var Q = require('q');
var then_redis;
function create_redis_client(uri)
{
if (!then_redis)
{
then_redis = require('then-redis');
}
return then_redis.createClient(uri);
}
//////////////////////////////////////////////////////////////////////
// Data model:
// - prefix +'_id': value of the last generated identifier;
// - prefix +'_ids': set containing identifier of all models;
// - prefix +'_'+ index +':' + value: set of identifiers which have
// value for the given index.
// - prefix +':'+ id: hash containing the properties of a model;
//////////////////////////////////////////////////////////////////////
// @todo then-redis sends commands in order, we should use this
// semantic to simplify the code.
// @todo Merge the options in the object to obtain extend-time
// configuration like Backbone.
// @todo Remote events.
function Redis(options, models)
{
if (!options)
{
options = {};
}
_.defaults(options, {
'uri': 'tcp://localhost:6379',
'indexes': [],
});
if (!options.prefix)
{
throw 'missing option: prefix';
}
Redis.super_.call(this, models);
this.redis = options.connection || create_redis_client(options.uri);
this.prefix = options.prefix;
this.indexes = options.indexes;
}
require('util').inherits(Redis, require('../collection'));
// Private method.
Redis.prototype._extract = function (ids) {
var redis = this.redis;
var prefix = this.prefix +':';
var promises = [];
_.each(ids, function (id) {
promises.push(redis.hgetall(prefix + id).then(function (model) {
// If empty, considers it a no match and returns null.
if (_.isEmpty(model))
{
return null;
}
// Mix the identifier in.
model.id = id;
return model;
}));
});
return Q.all(promises).then(function (models) {
return _.filter(models, function (model) {
return (null !== model);
});
});
};
Redis.prototype._add = function (models, options) {
// @todo Temporary mesure, implement “set()” instead.
var replace = !!(options && options.replace);
var redis = this.redis;
var prefix = this.prefix;
var indexes = this.indexes;
var promises = [];
_.each(models, function (model) {
var promise;
// Generates a new identifier if necessary.
if (undefined === model.id)
{
promise = redis.incr(prefix +'_id').then(function (id) {
model.id = id;
});
}
promise = Q.when(promise, function () {
// Adds the identifier to the models' ids set.
return redis.sadd(prefix +'_ids', model.id);
}).then(function (success) {
// The entry already existed an we are not in replace mode.
if (!success && !replace)
{
throw 'cannot add existing model: '+ model.id;
}
// @todo Remove existing fields.
var params = [prefix +':'+ model.id];
_.each(model, function (value, prop) {
// No need to store the id (already in the key.)
if ('id' === prop)
{
return;
}
params.push(prop, value);
});
var promises = [
redis.send('hmset', params),
];
// Adds indexes.
_.each(indexes, function (index) {
var value = model[index];
if (undefined === value)
{
return;
}
var key = prefix +'_'+ index +':'+ value;
promises.push(redis.sadd(key, model.id));
});
return Q.all(promises);
}).thenResolve(model);
promises.push(promise);
});
return Q.all(promises);
};
Redis.prototype._get = function (properties) {
var prefix = this.prefix;
var redis = this.redis;
var self = this;
if (_.isEmpty(properties))
{
return redis.smembers(prefix +'_ids').then(function (ids) {
return self._extract(ids);
});
}
// Special treatment for 'id'.
var id = properties.id;
delete properties.id;
// Special case where we only match against id.
if (_.isEmpty(properties))
{
return this._extract([id]);
}
var indexes = this.indexes;
var unfit = _.difference(_.keys(properties), indexes);
if (0 !== unfit.length)
{
throw 'not indexed fields: '+ unfit.join();
}
var keys = _.map(properties, function (value, index) {
return (prefix +'_'+ index +':'+ value);
});
return redis.send('sinter', keys).then(function (ids) {
if (undefined !== id)
{
if (!_.contains(ids, id))
{
return [];
}
ids = [id];
}
return self._extract(ids);
});
};
Redis.prototype._remove = function (ids) {
var redis = this.redis;
var prefix = this.prefix;
var promises = [];
var keys = [];
for (var i = 0, n = ids.length; i < n; ++i)
{
keys.push(prefix +':'+ ids[i]);
}
// @todo Handle indexes.
promises.push(
redis.send('srem', [prefix +'_ids'].concat(ids)),
redis.send('del', keys)
);
return Q.all(promises);
};
Redis.prototype._update = function (models) {
// @todo
return this._add(models, { 'replace': true });
};
//////////////////////////////////////////////////////////////////////
Redis.extend = require('extendable');
module.exports = Redis;

View File

@ -69,9 +69,9 @@ function json_api_call(session, message)
});
},
function (error) {
if (error instanceof Error)
if (!_.isObject(error) || (error instanceof Error))
{
console.error(error.stack);
console.error(error.stack || error);
return format_error(Api.err.SERVER_ERROR);
}
@ -323,40 +323,47 @@ var cfg = {
// Defaults values.
cfg.merge({
'http': {
'host': '127.0.0.1',
'port': 80,
'host': 'localhost',
},
'users': [],
'servers': [],
'redis': {
'uri': 'tcp://127.0.0.1:6379',
},
});
Q.ninvoke(require('fs'), 'readFile', __dirname +'/../config/local.yaml', {'encoding': 'utf8'}).then(function (data) {
data = require('js-yaml').safeLoad(data);
cfg.merge(data);
}).fail(function (e) {
console.error('[ERROR] Reading config file: '+ e);
console.error('[Warning] Reading config file: '+ e);
}).then(function () {
var users = xo.users;
cfg.get('users').forEach(function (user) {
if (user.password)
{
users.create(user.email, user.password, user.permission).done();
}
else
{
users.add(user).done();
}
});
xo.servers.add(cfg.get('servers')).done();
if (cfg.get('users'))
{
console.warn('[Warn] Users in config file are no longer supported.');
}
if (cfg.get('servers'))
{
console.warn('[Warn] Servers in config file are no longer supported.');
}
var port = cfg.get('http', 'port');
http_serv = require('http').createServer().listen(port).on('listening', function () {
console.log('XO-Server Web server is listening on port '+ port +'.');
console.info('XO-Server Web server is listening on port '+ port +'.');
});
var redis = require('then-redis').createClient('tcp://localhost:6379');
xo.start(cfg);
}).done();
// Create an initial user if there are none.
xo.on('started', function () {
xo.users.exists().then(function (success) {
if (success)
{
return;
}
console.warn('[Warning] No users, creating “admin@admin.net” with password “admin”');
return xo.users.create('admin@admin.net', 'admin', 'admin');
}).done();
});

View File

@ -22,7 +22,7 @@ require('util').inherits(Model, require('events').EventEmitter);
Model.prototype.initialize = function () {};
/**
* Validates the model.
* Validates the defined properties.
*
* @returns {undefined|mixed} Returns something else than undefined if
* there was an error.

157
src/xo.js
View File

@ -3,7 +3,8 @@ var crypto = require('crypto');
var hashy = require('hashy');
var Q = require('q');
var Collection = require('./collection');
var MemoryCollection = require('./collection/memory');
var RedisCollection = require('./collection/redis');
var Model = require('./model');
var Xapi = require('./xapi');
@ -40,7 +41,7 @@ var Server = Model.extend({
},
});
var Servers = Collection.extend({
var Servers = RedisCollection.extend({
'model': Server,
});
@ -61,13 +62,13 @@ var Token = Model.extend({
return Q.ninvoke(crypto, 'randomBytes', 32).then(function (buf) {
return new Token({
'id': buf.toString('base64'),
'user_id': +user_id,
'user_id': user_id,
});
});
},
});
var Tokens = Collection.extend({
var Tokens = RedisCollection.extend({
'model': Token,
'generate': function (user_id) {
@ -135,7 +136,7 @@ var User = Model.extend({
});
// @todo handle email uniqueness.
var Users = Collection.extend({
var Users = RedisCollection.extend({
'model': User,
'create': function (email, password, permission) {
@ -158,7 +159,7 @@ var Users = Collection.extend({
var Pool = Model.extend({});
var Pools = Collection.extend({
var Pools = MemoryCollection.extend({
'model': Pool,
});
@ -166,7 +167,7 @@ var Pools = Collection.extend({
var Host = Model.extend({});
var Hosts = Collection.extend({
var Hosts = MemoryCollection.extend({
'model': Host,
});
@ -174,7 +175,7 @@ var Hosts = Collection.extend({
var VM = Model.extend({});
var VMs = Collection.extend({
var VMs = MemoryCollection.extend({
'model': VM,
});
@ -182,7 +183,7 @@ var VMs = Collection.extend({
var Network = Model.extend({});
var Networks = Collection.extend({
var Networks = MemoryCollection.extend({
'model': Network,
});
@ -190,7 +191,7 @@ var Networks = Collection.extend({
var SR = Model.extend({});
var SRs = Collection.extend({
var SRs = MemoryCollection.extend({
'model': SR,
});
@ -198,7 +199,7 @@ var SRs = Collection.extend({
var VDI = Model.extend({});
var VDIs = Collection.extend({
var VDIs = MemoryCollection.extend({
'model': VDI,
});
@ -206,7 +207,7 @@ var VDIs = Collection.extend({
var PIF = Model.extend({});
var PIFs = Collection.extend({
var PIFs = MemoryCollection.extend({
'model': PIF,
});
@ -214,7 +215,7 @@ var PIFs = Collection.extend({
var VIF = Model.extend({});
var VIFs = Collection.extend({
var VIFs = MemoryCollection.extend({
'model': VIF,
});
@ -222,7 +223,7 @@ var VIFs = Collection.extend({
// Collections
//////////////////////////////////////////////////////////////////////
var VDIs = Collection.extend({
var VDIs = MemoryCollection.extend({
'model': VDI,
});
@ -511,52 +512,7 @@ function Xo()
var xo = this;
//--------------------------------------
// Main objects (@todo should be persistent).
xo.servers = new Servers();
xo.tokens = new Tokens();
xo.users = new Users();
// When a server is added we should connect to it and fetch data.
xo.servers.on('add', function (servers) {
_.each(servers, function (server) {
var xapi = new Xapi(server.host);
xapi.connect(server.username, server.password).then(function () {
// @todo Use events.
!function helper() {
refresh(xo, xapi).then(function () {
setTimeout(helper, 5000);
}).done();
}();
}).fail(function (error) {
console.log(error);
}).done();
});
});
xo.servers.on('remove', function (server_ids) {
// @todo
});
// xo events are used to automatically close connections if the
// associated credentials are invalidated.
xo.tokens.on('remove', function (token_ids) {
_.each(token_ids, function (token_id) {
xo.emit('token.revoked:'+ token_id);
});
});
xo.users.on('remove', function (user_ids) {
_.each(user_ids, function (user_id) {
user_id = +user_id;
xo.emit('user.revoked:'+ user_id);
// All associated tokens must be destroyed too.
xo.tokens.get({'user_id': user_id}).then(function (tokens) {
return xo.tokens.remove(_.pluck(tokens, 'id'));
}).done();
});
});
// Connections to Xen pools/servers.
xo.connections = {};
@ -575,31 +531,80 @@ function Xo()
// Connecting classes. (@todo VBD & SR).
xo.vifs = new VIFs();
xo.pifs = new PIFs();
// -------------------------------------
// Temporary data for testing purposes.
xo.users.add([{
'email': 'bob@gmail.com',
'pw_hash': '$2a$10$PsSOXflmnNMEOd0I5ohJQ.cLty0R29koYydD0FBKO9Rb7.jvCelZq',
'permission': 'admin',
}, {
'email': 'toto@gmail.com',
'pw_hash': '$2a$10$PsSOXflmnNMEOd0I5ohJQ.cLty0R29koYydD0FBKO9Rb7.jvCelZq',
'permission': 'none',
}]).done();
}
require('util').inherits(Xo, require('events').EventEmitter);
Xo.prototype.start = function (options) {
Xo.prototype.start = function (cfg) {
var xo = this;
var redis = require('then-redis').createClient(cfg.get('redis', 'uri'));
// @todo Connect to persistent collection.
//--------------------------------------
// Persistent collections.
xo.servers = new Servers({
'connection': redis,
'prefix': 'xo:server',
'indexes': ['host'],
});
xo.tokens = new Tokens({
'connection': redis,
'prefix': 'xo:token',
'indexes': ['user_id'],
});
xo.users = new Users({
'connection': redis,
'prefix': 'xo:user',
'indexes': ['email'],
});
// When a server is added we should connect to it and fetch data.
var connect = function (server) {
var xapi = new Xapi(server.host);
xapi.connect(server.username, server.password).then(function () {
// @todo Use events.
!function helper() {
refresh(xo, xapi).then(function () {
setTimeout(helper, 5000);
}).done();
}();
}).fail(function (error) {
console.log(error);
}).done();
};
// Connect existing servers.
xo.servers.get().then(function (servers) {
_.each(servers, connect);
}).done();
// Automatically connect new servers.
xo.servers.on('add', function (servers) {
_.each(servers, connect);
});
xo.servers.on('remove', function (server_ids) {
// @todo
});
// xo events are used to automatically close connections if the
// associated credentials are invalidated.
xo.tokens.on('remove', function (token_ids) {
_.each(token_ids, function (token_id) {
xo.emit('token.revoked:'+ token_id);
});
});
xo.users.on('remove', function (user_ids) {
_.each(user_ids, function (user_id) {
xo.emit('user.revoked:'+ user_id);
// All associated tokens must be destroyed too.
xo.tokens.get({'user_id': user_id}).then(function (tokens) {
return xo.tokens.remove(_.pluck(tokens, 'id'));
}).done();
});
});
//--------------------------------------
xo.emit('started', options);
xo.emit('started', cfg);
};
//////////////////////////////////////////////////////////////////////