Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Persistance Support #36

Merged
merged 30 commits into from
Jun 26, 2013
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5f57283
Implemented basic persistance layer on top of LevelUp.
mcollina Jun 17, 2013
718126e
Moved the wiring to the Server in the persistance itself.
mcollina Jun 17, 2013
4b953cf
Fixed spurious failure in the levelup tests.
mcollina Jun 18, 2013
69d1d15
Restoring a client's subscriptions after a disconnect.
mcollina Jun 18, 2013
a8785e2
Using a global nop instead of creating an empty function.
mcollina Jun 18, 2013
a193e4d
Added the support for uncleaned clients.
mcollina Jun 18, 2013
440ede7
Added the TTL to the persistance.
mcollina Jun 19, 2013
d4838ce
Updates node-level-ttl to the released 0.2.0.
mcollina Jun 20, 2013
a33587f
Made the persistances work even if called as a Function.
mcollina Jun 21, 2013
031e8ff
Added basic persistance layer for redis.
mcollina Jun 21, 2013
8ac488b
Added support for pattern subscriptions in the persistance.
mcollina Jun 21, 2013
48012d8
Added restoration support of the Qlobber data structure for LevelUp.
mcollina Jun 21, 2013
53c4d17
Added synchronization support for Redis persistance.
mcollina Jun 21, 2013
397f478
Limiting unclean client handling to QoS 1 subscriptions.
mcollina Jun 21, 2013
0ba1b7a
Updated Ascoltatori to 0.7.0
mcollina Jun 23, 2013
0bfc589
Added MongoDB as an optional dependency.
mcollina Jun 23, 2013
fa72c3c
Added mongodb in .travis.yml.
mcollina Jun 23, 2013
ff5b859
Implemented mongo persistance.
mcollina Jun 24, 2013
9d12d91
Made Mongo persistance spec faster.
mcollina Jun 24, 2013
2cbbe16
Added 'before' to the .jshintrc.
mcollina Jun 24, 2013
3bc35d3
Fixed offline topic support with wildcards in Mongodb.
mcollina Jun 25, 2013
ed772c8
Added memoization of topicPatterns.
mcollina Jun 25, 2013
90c1677
Reformatting of the persistance utilities module.
mcollina Jun 25, 2013
0f4bebc
Implemtened CLI support for persistance.
mcollina Jun 25, 2013
5ec3c83
Added benchmark support for uncleaned clients.
mcollina Jun 25, 2013
b93bf04
Refactored inflight support in Client to support storing them on disc…
mcollina Jun 25, 2013
722af40
Added support for storing inflight packets for client.
mcollina Jun 25, 2013
89a136e
Added logging to the persistance wiring.
mcollina Jun 26, 2013
a0fac09
Added API comments.
mcollina Jun 26, 2013
70bb77f
Updated the README with the persistance support.
mcollina Jun 26, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added support for storing inflight packets for client.
  • Loading branch information
mcollina committed Jun 25, 2013
commit 722af400935494189743af8e7f886b6a863bd2fc
12 changes: 12 additions & 0 deletions lib/persistance/abstract.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"use strict";

var async = require("async");

function AbstractPersistence() {

}
Expand Down Expand Up @@ -60,7 +62,17 @@ AbstractPersistence.prototype.wire = function(server) {

server.on("clientDisconnecting", function(client) {
that.storeSubscriptions(client);
that.storeInflightPackets(client);
});
};

AbstractPersistence.prototype.storeInflightPackets = function(client, done) {
if (client.inflight) {
var that = this;
async.each(Object.keys(client.inflight), function(key, cb) {
that._storePacket(client.id, client.inflight[key].packet, cb);
}, done);
}
};

module.exports = AbstractPersistence;
16 changes: 10 additions & 6 deletions lib/persistance/levelup.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,13 @@ LevelUpPersistance.prototype.lookupSubscriptions = function(client, done) {
LevelUpPersistance.prototype.storeOfflinePacket = function(packet, done) {
var that = this;
var subs = this._subLobber.match(packet.topic);
var ttl = {
ttl: that.options.ttl.subscriptions
};

async.each(subs, function(key, cb) {
that._subscriptions.get(key, function(err, sub) {
if (err) {
return cb(err);
}
var key = util.format("%s:%s", sub.client, new Date().toISOString());
that._offlinePackets.put(
key, packet, ttl, cb);
that._storePacket(sub.client, packet, cb);
});
}, done);
};
Expand All @@ -165,6 +160,15 @@ LevelUpPersistance.prototype.streamOfflinePackets = function(client, cb, done) {
}
};

LevelUpPersistance.prototype._storePacket = function(client, packet, cb) {
var key = util.format("%s:%s", client, new Date().toISOString());
var ttl = {
ttl: this.options.ttl.subscriptions
};
this._offlinePackets.put(
key, packet, ttl, cb);
};

LevelUpPersistance.prototype.close = function(cb) {
this.db.close(cb);
};
Expand Down
12 changes: 8 additions & 4 deletions lib/persistance/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,7 @@ MongoPersistance.prototype.storeOfflinePacket = function(packet, done) {
stream.on("data", function(data) {
started++;

that._packets.insert({
client: data.client,
packet: packet
}, function(err) {
that._storePacket(data.client, packet, function(err) {
if (err) {
return stream.emit("error", err);
}
Expand All @@ -185,6 +182,13 @@ MongoPersistance.prototype.storeOfflinePacket = function(packet, done) {
});
};

MongoPersistance.prototype._storePacket = function(client, packet, cb) {
this._packets.insert({
client: client,
packet: packet
}, cb);
};

MongoPersistance.prototype.streamOfflinePackets = function(client, cb) {
if (client.clean) {
return;
Expand Down
6 changes: 5 additions & 1 deletion lib/persistance/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,14 @@ RedisPersistance.prototype.storeOfflinePacket = function(packet, done) {

var matches = this._subLobber.match(packet.topic);
async.each(matches, function(client, cb) {
that._client.lpush("packets:" + client, JSON.stringify(packet), cb);
that._storePacket(client, packet, cb);
}, done);
};

RedisPersistance.prototype._storePacket = function(client, packet, cb) {
this._client.lpush("packets:" + client, JSON.stringify(packet), cb);
};

RedisPersistance.prototype.streamOfflinePackets = function(client, cb) {
var that = this;

Expand Down
61 changes: 61 additions & 0 deletions test/persistance/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -566,4 +566,65 @@ module.exports = function(create) {
});
});
});

describe("inflight packets", function() {
var packet = {
topic: "hello",
qos: 0,
payload: "world",
messageId: 42
};
var client = {
id: "my client id - 42",
clean: false,
subscriptions: {
hello: {
qos: 1
}
},
inflight: {
42: { packet: packet }
}
};

it("should store one inflight packet", function(done) {
this.instance.storeInflightPackets(client, done);
});

it("should store and stream an inflight packet", function(done) {
var instance = this.instance;
instance.storeInflightPackets(client, function() {
instance.streamOfflinePackets(client, function(err, p) {
expect(p).to.eql(packet);
done();
});
});
});

it("should delete the offline packets once streamed", function(done) {
var instance = this.instance;
instance.storeInflightPackets(client, function() {
instance.streamOfflinePackets(client, function(err, p) {
instance.streamOfflinePackets(client, function(err, p2) {
done(new Error("this should never be called"));
});
done();
});
});
});

it("should wire itself up to the 'clientDisconnecting' event of a Server", function(done) {
var em = new EventEmitter();
var instance = this.instance;
instance.wire(em);

em.emit("clientDisconnecting", client);

setTimeout(function() {
instance.streamOfflinePackets(client, function(err, packet) {
done();
});
}, 20); // 20ms will suffice
});
});
};