Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Persistance Support #36

Merged
merged 30 commits into from
Jun 26, 2013
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5f57283
Implemented basic persistance layer on top of LevelUp.
mcollina Jun 17, 2013
718126e
Moved the wiring to the Server in the persistance itself.
mcollina Jun 17, 2013
4b953cf
Fixed spurious failure in the levelup tests.
mcollina Jun 18, 2013
69d1d15
Restoring a client's subscriptions after a disconnect.
mcollina Jun 18, 2013
a8785e2
Using a global nop instead of creating an empty function.
mcollina Jun 18, 2013
a193e4d
Added the support for uncleaned clients.
mcollina Jun 18, 2013
440ede7
Added the TTL to the persistance.
mcollina Jun 19, 2013
d4838ce
Updates node-level-ttl to the released 0.2.0.
mcollina Jun 20, 2013
a33587f
Made the persistances work even if called as a Function.
mcollina Jun 21, 2013
031e8ff
Added basic persistance layer for redis.
mcollina Jun 21, 2013
8ac488b
Added support for pattern subscriptions in the persistance.
mcollina Jun 21, 2013
48012d8
Added restoration support of the Qlobber data structure for LevelUp.
mcollina Jun 21, 2013
53c4d17
Added synchronization support for Redis persistance.
mcollina Jun 21, 2013
397f478
Limiting unclean client handling to QoS 1 subscriptions.
mcollina Jun 21, 2013
0ba1b7a
Updated Ascoltatori to 0.7.0
mcollina Jun 23, 2013
0bfc589
Added MongoDB as an optional dependency.
mcollina Jun 23, 2013
fa72c3c
Added mongodb in .travis.yml.
mcollina Jun 23, 2013
ff5b859
Implemented mongo persistance.
mcollina Jun 24, 2013
9d12d91
Made Mongo persistance spec faster.
mcollina Jun 24, 2013
2cbbe16
Added 'before' to the .jshintrc.
mcollina Jun 24, 2013
3bc35d3
Fixed offline topic support with wildcards in Mongodb.
mcollina Jun 25, 2013
ed772c8
Added memoization of topicPatterns.
mcollina Jun 25, 2013
90c1677
Reformatting of the persistance utilities module.
mcollina Jun 25, 2013
0f4bebc
Implemtened CLI support for persistance.
mcollina Jun 25, 2013
5ec3c83
Added benchmark support for uncleaned clients.
mcollina Jun 25, 2013
b93bf04
Refactored inflight support in Client to support storing them on disc…
mcollina Jun 25, 2013
722af40
Added support for storing inflight packets for client.
mcollina Jun 25, 2013
89a136e
Added logging to the persistance wiring.
mcollina Jun 26, 2013
a0fac09
Added API comments.
mcollina Jun 26, 2013
70bb77f
Updated the README with the persistance support.
mcollina Jun 26, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Moved the wiring to the Server in the persistance itself.
  • Loading branch information
mcollina committed Jun 18, 2013
commit 718126ed679b2c8437ded582f467d7f2ad16ff78
32 changes: 32 additions & 0 deletions lib/persistance/abstract.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

function AbstractPersistence() {

}

AbstractPersistence.prototype.wire = function(server) {
var that = this;

server.on("published", function(packet) {
if (packet.retain) {
that.storeRetained(packet);
}
});

server.on("subscribed", function(pattern, client) {
that.lookupRetained(pattern, function(err, matches) {
if (err) {
client.emit("error", err);
return;
}
matches.forEach(function(match) {
client.forward(match.topic, match.payload, match, pattern);
});
});
});

server.on("close", function() {
that.close();
});
};

module.exports = AbstractPersistence;
4 changes: 4 additions & 0 deletions lib/persistance/levelup.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

var levelup = require("levelup");
var sublevel = require("level-sublevel");
var AbstractPersistence = require("./abstract");
var util = require("util");

function LevelUpPersistance(path, options) {
options = options || {};
Expand All @@ -9,6 +11,8 @@ function LevelUpPersistance(path, options) {
this._retained = this.db.sublevel("retained");
}

util.inherits(LevelUpPersistance, AbstractPersistence);

LevelUpPersistance.prototype.storeRetained = function(packet, cb) {
this._retained.put(packet.topic, packet, cb);
};
Expand Down
2 changes: 1 addition & 1 deletion lib/persistance/memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
var LevelUpPersistance = require("./levelup");
var util = require("util");
var MemDOWN = require("memdown");
var factory = function (location) { return new MemDOWN(location) };
var factory = function (location) { return new MemDOWN(location); };

function MemoryPersistance() {
LevelUpPersistance.call(this, "RAM", { db: factory });
Expand Down
22 changes: 1 addition & 21 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var persistance = require("./persistance");
* the client is passed as a parameter.
* - `published`, when a new message is published;
* the packet and the client are passed as parameters.
* - `subcribed`, when a new client is subscribed to a pattern;
* - `subscribed`, when a new client is subscribed to a pattern;
* the pattern and the client are passed as parameters.
*
* @param {Object} opts The option object
Expand Down Expand Up @@ -71,8 +71,6 @@ function Server(opts, callback) {
this.ascoltatore = ascoltatori.build(this.opts.backend);
this.ascoltatore.on("error", this.emit.bind(this));

this.persistance = this.opts.persistance || new persistance.Memory();

that.once("ready", callback);

async.series([
Expand All @@ -96,24 +94,6 @@ function Server(opts, callback) {
that.on("clientDisconnected", function(client) {
delete that.clients[client.id];
});

that.on("published", function(packet) {
if (packet.retain) {
that.persistance.storeRetained(packet);
}
});

that.on("subscribed", function(pattern, client) {
that.persistance.lookupRetained(pattern, function(err, matches) {
if (err) {
client.emit("error", err);
return;
}
matches.forEach(function(match) {
client.forward(match.topic, match.payload, match, pattern);
});
});
});
}

module.exports = Server;
Expand Down
48 changes: 48 additions & 0 deletions test/persistance/abstract.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";

var async = require("async");
var EventEmitter = require("events").EventEmitter;

module.exports = function(create) {

Expand Down Expand Up @@ -93,4 +94,51 @@ module.exports = function(create) {
}
], done);
});

it("should wire itself up to the 'published' event of a Server", function(done) {
var em = new EventEmitter();
var packet1 = {
topic: "hello/1",
qos: 0,
payload: "world",
messageId: 42,
retain: true
};

instance.wire(em);

em.emit("published", packet1);

instance.lookupRetained(packet1.topic, function(err, results) {
expect(results).to.eql([packet1]);
done();
});
});

it("should wire itself up to the 'subscribed' event of a Server", function(done) {
var em = new EventEmitter();
var packet1 = {
topic: "hello/1",
qos: 0,
payload: "world",
messageId: 42,
retain: true
};

var client = {
forward: function(topic, payload, options, pattern) {
expect(topic).to.eql(packet1.topic);
expect(payload).to.eql(packet1.payload);
expect(options).to.eql(packet1);
expect(pattern).to.eql("hello/#");
done();
}
};

instance.wire(em);

instance.storeRetained(packet1, function() {
em.emit("subscribed", "hello/#", client);
});
});
};
2 changes: 1 addition & 1 deletion test/persistance/levelup_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ var abstract = require("./abstract");
var LevelUp = require("../../").persistance.LevelUp;
var tmp = require("tmp");

describe("mosca.persitance.LevelUp", function() {
describe("mosca.persistance.LevelUp", function() {
abstract(function(cb) {
tmp.dir(function (err, path) {
if (err) {
Expand Down
2 changes: 1 addition & 1 deletion test/persistance/memory_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
var abstract = require("./abstract");
var Memory = require("../../").persistance.Memory;

describe("mosca.persitance.Memory", function() {
describe("mosca.persistance.Memory", function() {
abstract(function(cb) {
cb(null, new Memory());
});
Expand Down
3 changes: 3 additions & 0 deletions test/server_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,9 @@ describe("mosca.Server", function() {
});

it("should support retained messages", function(done) {
var pers = new mosca.persistance.Memory();

pers.wire(instance);

async.waterfall([

Expand Down