Skip to content

Commit

Permalink
First compiling version of pqueues.
Browse files Browse the repository at this point in the history
  • Loading branch information
lgastako committed Mar 7, 2009
1 parent 8c54de4 commit d63351f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 21 deletions.
4 changes: 2 additions & 2 deletions erq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ handle_set(Args, Socket) ->
erqutils:debug("Reading data of size ~p.~n", [Size]),
case read_fixed_data(Socket, Size) of
{ok, Data} ->
case mqueue:enqueue(QueueName, Data) of
case pqueue:enqueue(QueueName, Data) of
ack -> "STORED\r\n";
{error, ErrorMessage} -> lists:flatten(io_lib:format("SERVER_ERROR ~p\r\n",
[ErrorMessage]));
Expand All @@ -54,7 +54,7 @@ handle_set(Args, Socket) ->
handle_get(Args) ->
[QueueName] = Args,
erqutils:debug("get requested from queue: ~p~n", [QueueName]),
case mqueue:dequeue(QueueName) of
case pqueue:dequeue(QueueName) of
{ok, Data} ->
lists:flatten(io_lib:format("VALUE ~s 0 ~.10B\r\n",
[QueueName, length(Data)]) ++ Data ++ "\r\nEND\r\n");
Expand Down
28 changes: 15 additions & 13 deletions journal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@ dequeue(QueueName) ->
end.


journal_path(QueueName) ->
%% TODO: Un-hardcode path
"/tmp/erq_" ++ QueueName ++ ".ej".


replay(QueueName) ->
{ok, File} = file:open(journal_path(QueueName), read),
{ok, Terms} = file:consult(File),
file:close(File),
Terms.


get_queue_pid(QueueName) ->
get_journal_pid(QueueName) ->
JournalNameAtom = list_to_atom("journal:" ++ QueueName),
case whereis(JournalNameAtom) of
undefined -> register(JournalNameAtom,
Expand All @@ -34,28 +41,23 @@ get_queue_pid(QueueName) ->
end.


%% From p. 235 of Programming Erlang
unconsult(File, L) ->
{ok, S} = file:open(File, write),
lists:foreach(fun(X) -> io:format(S, "~p.~n",[X]) end, L),
file:close(S).



setup_journal(QueueName) ->
% Open the files,
File = file:open(File, [write, append])
File = file:open(journal_path(QueueName), [write, append]),
manage_journal(File).


write_term(File, Term) ->
io:format(File, "~p.~n", [Term]).


manage_journal(File) ->
receive
{set, Data, ReplyPid} ->
write_op(File, {set, Data}),
write_term(File, {set, Data}),
%% TODO: Actually check results
ReplyPid ! ok;
{get, ReplyPid} ->
write_op(File, get),
write_term(File, get),
%% TODO: Actually check results
ReplyPid ! ok;
Other ->
Expand Down
27 changes: 21 additions & 6 deletions mqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,45 @@ get_queue_pid(QueueName) ->
QueueNameAtom = list_to_atom("queue" ++ QueueName),
case whereis(QueueNameAtom) of
undefined -> register(QueueNameAtom,
spawn(fun() -> manage_queue(queue:new()) end));
spawn(fun() -> setup_queue(QueueName) end));
ExistingQueuePid -> ExistingQueuePid
end.


setup_queue(QueueName) ->
manage_queue(QueueName,
replay_journal_items(journal:replay(QueueName),
queue:new())).

manage_queue(Q) ->
%% erqutils:debug("Q is now length ~p and contains: ~p~n", [queue:len(Q), Q]),

replay_journal_items([], Q) -> Q;
replay_journal_items([Head|Tail], Q) ->
replay_journal_items(Tail, queue:cons(Q, Head)).


manage_queue(QueueName, Q) ->
%% erqutils:debug("Q is now length ~p and contains: ~p~n", [queue:len(Q), Q]),
erqutils:debug("Q is now length ~p.", [queue:len(Q)]),
receive
{add, Item, Pid} ->
NewQ = queue:cons(Item, Q),
%% TODO: confirm success
journal:enqueue(QueueName, Item),
Result = ack;
{get, Pid} ->
case queue:is_empty(Q) of
true ->
NewQ = Q,
%% TODO: should we journal:dequeue just in case?
Result = {empty};
false ->
NewQ = queue:init(Q),
%% TODO: confirm success
journal:dequeue(QueueName),
Result = {ok, queue:last(Q)}
end;
Msg ->
io:format("ERROR! Could not handle msg: ~p~n", [Msg]),
Other ->
io:format("ERROR! Could not handle msg: ~p~n", [Other]),
Pid = -1,
Result = {error, "could not handle msg"},
NewQ = Q
Expand All @@ -38,7 +53,7 @@ manage_queue(Q) ->
Pid =/= -1 ->
Pid ! Result
end,
manage_queue(NewQ).
manage_queue(QueueName, NewQ).


enqueue(QueueName, Data) ->
Expand Down

0 comments on commit d63351f

Please sign in to comment.