Skip to content

Commit

Permalink
Initial work on a clean pqueue/mqueue/journal setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
lgastako committed Mar 7, 2009
1 parent 546da98 commit 8c54de4
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 69 deletions.
74 changes: 56 additions & 18 deletions journal.erl
Original file line number Diff line number Diff line change
@@ -1,26 +1,64 @@
-module(journal).
-export(...).
-export([enqueue/2, dequeue/1]).

start_journal_for_queue_path(QueuePath) ->
spawn(fun() -> manage_journal(QueuePath) end).

manage_journal(QueuePath) ->
enqueue(QueueName, Data) ->
JournalPid = get_journal_pid(QueueName),
JournalPid ! {set, Data, self()},
%% I bet you can just use "receive" as the last statement. TODO: Try this.
%% If not, there's surely a better idiom for this in Erlang, so figure it out.
receive
%% {cmd, open} -> do_open(QueuePath);
%% {cmd, erase} -> do_erase(QueuePath);
%% {cmd, roll} -> do_roll(QueuePath);
%% {cmd, close} -> do_close(QueuePath);
%% {cmd, add} -> do_add(QueuePath);
%% {cmd, add_with_xid} -> do_add_with_xid(QueuePath);
%% remove
%% remove_tentative
%% unremove
%% confirm ->
X -> X
end.

erase(Journal) ->
close it and delete it.

write_items(Journal, Items) ->
...
dequeue(QueueName) ->
JournalPid = get_journal_pid(QueueName),
JournalPid ! {get, self()},
receive
X -> X
end.


replay(QueueName) ->
{ok, Terms} = file:consult(File),
Terms.


get_queue_pid(QueueName) ->
JournalNameAtom = list_to_atom("journal:" ++ QueueName),
case whereis(JournalNameAtom) of
undefined -> register(JournalNameAtom,
spawn(fun() -> setup_journal(QueueName) end));
ExistingJournalPid -> ExistingJournalPid
end.


%% From p. 235 of Programming Erlang
unconsult(File, L) ->
{ok, S} = file:open(File, write),
lists:foreach(fun(X) -> io:format(S, "~p.~n",[X]) end, L),
file:close(S).



setup_journal(QueueName) ->
% Open the files,
File = file:open(File, [write, append])
manage_journal(File).


manage_journal(File) ->
receive
{set, Data, ReplyPid} ->
write_op(File, {set, Data}),
%% TODO: Actually check results
ReplyPid ! ok;
{get, ReplyPid} ->
write_op(File, get),
%% TODO: Actually check results
ReplyPid ! ok;
Other ->
io:format("Unexpected message: ~p~n", [Other])
end,
manage_journal(File).
4 changes: 2 additions & 2 deletions mqueue.erl
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
-module(erqueue).
-module(mqueue).
-export([enqueue/2, dequeue/1]).


get_queue_pid(QueueName) ->
QueueNameAtom = list_to_atom(QueueName),
QueueNameAtom = list_to_atom("queue" ++ QueueName),
case whereis(QueueNameAtom) of
undefined -> register(QueueNameAtom,
spawn(fun() -> manage_queue(queue:new()) end));
Expand Down
56 changes: 7 additions & 49 deletions pqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,16 @@
-export([enqueue/2, dequeue/1]).


get_queue_pid(QueueName) ->
QueueNameAtom = list_to_atom(QueueName),
case whereis(QueueNameAtom) of
undefined -> register(QueueNameAtom,
spawn(fun() -> manage_queue(queue:new()) end));
ExistingQueuePid -> ExistingQueuePid
end.



manage_queue(Q) ->
%% erqutils:debug("Q is now length ~p and contains: ~p~n", [queue:len(Q), Q]),
erqutils:debug("Q is now length ~p.", [queue:len(Q)]),
receive
{add, Item, Pid} ->
NewQ = queue:cons(Item, Q),
Result = ack;
{get, Pid} ->
case queue:is_empty(Q) of
true ->
NewQ = Q,
Result = {empty};
false ->
NewQ = queue:init(Q),
Result = {ok, queue:last(Q)}
end;
Msg ->
io:format("ERROR! Could not handle msg: ~p~n", [Msg]),
Pid = -1,
Result = {error, "could not handle msg"},
NewQ = Q
end,
if
Pid =/= -1 ->
Pid ! Result
end,
manage_queue(NewQ).


enqueue(QueueName, Data) ->
QueuePid = get_queue_pid(QueueName),
QueuePid ! {add, Data, self()},
receive
X -> X
end.
mqueue:enqueue(mangle(QueueName), Data),
journal:enqueue(mangle(QueueName), Data).


dequeue(QueueName) ->
QueuePid = get_queue_pid(QueueName),
QueuePid ! {get, self()},
receive
X -> X
end.
Result = mqueue:dequeue(mangle(QueueName)),
journal:remove(),
Result.


mangle(QueueName) ->
"__PERSISTENT__" ++ QueueName.

0 comments on commit 8c54de4

Please sign in to comment.