Skip to content

Commit

Permalink
Getting closer.
Browse files Browse the repository at this point in the history
  • Loading branch information
lgastako committed Mar 7, 2009
1 parent 97bb11d commit a879170
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
4 changes: 2 additions & 2 deletions erq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ handle_set(Args, Socket) ->
erqutils:debug("Reading data of size ~p.", [Size]),
case read_fixed_data(Socket, Size) of
{ok, Data} ->
case pqueue:enqueue(QueueName, Data) of
case mqueue:enqueue(QueueName, Data) of
ack -> "STORED\r\n";
{error, ErrorMessage} -> lists:flatten(io_lib:format("SERVER_ERROR ~p\r\n",
[ErrorMessage]));
Expand All @@ -54,7 +54,7 @@ handle_set(Args, Socket) ->
handle_get(Args) ->
[QueueName] = Args,
erqutils:debug("get requested from queue: ~p", [QueueName]),
case pqueue:dequeue(QueueName) of
case mqueue:dequeue(QueueName) of
{ok, Data} ->
lists:flatten(io_lib:format("VALUE ~s 0 ~.10B\r\n",
[QueueName, length(Data)]) ++ Data ++ "\r\nEND\r\n");
Expand Down
18 changes: 13 additions & 5 deletions journal.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-module(journal).
-export([enqueue/2, dequeue/1]).
-export([enqueue/2, dequeue/1, replay/1]).


enqueue(QueueName, Data) ->
Expand All @@ -26,10 +26,18 @@ journal_path(QueueName) ->


replay(QueueName) ->
{ok, File} = file:open(journal_path(QueueName), read),
{ok, Terms} = file:consult(File),
file:close(File),
Terms.
erqutils:debug("Replaying journal items for queue: ~p", [QueueName]),
case file:open(journal_path(QueueName), read) of
{ok, File} ->
{ok, Terms} = file:consult(File),
file:close(File),
{ok, Terms};
{error, enoent} ->
{ok, []};
{error, Reason} ->
io:format("Could not open journal for reason: ~p", [Reason]),
{error, Reason}
end.


get_journal_pid(QueueName) ->
Expand Down
6 changes: 5 additions & 1 deletion mqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ get_queue_pid(QueueName) ->


setup_queue(QueueName) ->
erqutils:debug("Setting up queue: ~p", [QueueName]),
manage_queue(QueueName,
replay_journal_items(journal:replay(QueueName),
queue:new())).


replay_journal_items([], Q) -> Q;
replay_journal_items([], Q) ->
erqutils:debug("Done with replay.", []),
Q;
replay_journal_items([Head|Tail], Q) ->
erqutils:debug("Replaying ~p more journal items.", [lists:length(Tail) + 1]),
replay_journal_items(Tail, queue:cons(Q, Head)).


Expand Down

0 comments on commit a879170

Please sign in to comment.