Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Persistance Support #36

Merged
merged 30 commits into from
Jun 26, 2013
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5f57283
Implemented basic persistance layer on top of LevelUp.
mcollina Jun 17, 2013
718126e
Moved the wiring to the Server in the persistance itself.
mcollina Jun 17, 2013
4b953cf
Fixed spurious failure in the levelup tests.
mcollina Jun 18, 2013
69d1d15
Restoring a client's subscriptions after a disconnect.
mcollina Jun 18, 2013
a8785e2
Using a global nop instead of creating an empty function.
mcollina Jun 18, 2013
a193e4d
Added the support for uncleaned clients.
mcollina Jun 18, 2013
440ede7
Added the TTL to the persistance.
mcollina Jun 19, 2013
d4838ce
Updates node-level-ttl to the released 0.2.0.
mcollina Jun 20, 2013
a33587f
Made the persistances work even if called as a Function.
mcollina Jun 21, 2013
031e8ff
Added basic persistance layer for redis.
mcollina Jun 21, 2013
8ac488b
Added support for pattern subscriptions in the persistance.
mcollina Jun 21, 2013
48012d8
Added restoration support of the Qlobber data structure for LevelUp.
mcollina Jun 21, 2013
53c4d17
Added synchronization support for Redis persistance.
mcollina Jun 21, 2013
397f478
Limiting unclean client handling to QoS 1 subscriptions.
mcollina Jun 21, 2013
0ba1b7a
Updated Ascoltatori to 0.7.0
mcollina Jun 23, 2013
0bfc589
Added MongoDB as an optional dependency.
mcollina Jun 23, 2013
fa72c3c
Added mongodb in .travis.yml.
mcollina Jun 23, 2013
ff5b859
Implemented mongo persistance.
mcollina Jun 24, 2013
9d12d91
Made Mongo persistance spec faster.
mcollina Jun 24, 2013
2cbbe16
Added 'before' to the .jshintrc.
mcollina Jun 24, 2013
3bc35d3
Fixed offline topic support with wildcards in Mongodb.
mcollina Jun 25, 2013
ed772c8
Added memoization of topicPatterns.
mcollina Jun 25, 2013
90c1677
Reformatting of the persistance utilities module.
mcollina Jun 25, 2013
0f4bebc
Implemtened CLI support for persistance.
mcollina Jun 25, 2013
5ec3c83
Added benchmark support for uncleaned clients.
mcollina Jun 25, 2013
b93bf04
Refactored inflight support in Client to support storing them on disc…
mcollina Jun 25, 2013
722af40
Added support for storing inflight packets for client.
mcollina Jun 25, 2013
89a136e
Added logging to the persistance wiring.
mcollina Jun 26, 2013
a0fac09
Added API comments.
mcollina Jun 26, 2013
70bb77f
Updated the README with the persistance support.
mcollina Jun 26, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactored inflight support in Client to support storing them on disc…
…onnection.
  • Loading branch information
mcollina committed Jun 25, 2013
commit b93bf045860a5ec1b7cd8385ffabe4e2412048a9
16 changes: 12 additions & 4 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ Client.prototype.setUpTimer = function() {
*/
Client.prototype.actualSend = function(packet, retry) {
var that = this;
var timer;

if (that._closed) {
this.logger.warn({ packet: packet, retry: retry }, "tryint to send a packet to a disconnected client");
Expand All @@ -130,12 +131,18 @@ Client.prototype.actualSend = function(packet, retry) {

if (packet.qos === 1) {
this.logger.debug({ packet: packet, retry: retry }, "setting up the resend timer");
this.inflight[packet.messageId] = setTimeout(function() {

timer = setTimeout(function() {
retry++;
that.actualSend(packet, retry);

// exponential backoff algorithm
}, this.server.opts.baseRetryTimeout * Math.pow(2, retry));

this.inflight[packet.messageId] = {
packet: packet,
timer: timer
};
}
}
};
Expand Down Expand Up @@ -257,7 +264,7 @@ Client.prototype.handlePuback = function(packet) {

logger.debug({ packet: packet }, "puback");
if (this.inflight[packet.messageId]) {
clearTimeout(this.inflight[packet.messageId]);
clearTimeout(this.inflight[packet.messageId].timer);
delete this.inflight[packet.messageId];
} else {
logger.warn({ packet: packet }, "no such packet");
Expand Down Expand Up @@ -438,6 +445,9 @@ Client.prototype.close = function(callback) {
var cleanup = function() {
that._closed = true;

that.connection.removeAllListeners();
that.server.emit("clientDisconnected", that);

// clears the inflights timeout here
// as otherwise there might be one issued
// after calling end()
Expand All @@ -446,8 +456,6 @@ Client.prototype.close = function(callback) {
delete that.inflight[id];
});

that.connection.removeAllListeners();
that.server.emit("clientDisconnected", that);
if (callback) {
callback();
}
Expand Down