Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117882362
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
5 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/apps/egara/src/egara_notifications_receiver.erl b/apps/egara/src/egara_notifications_receiver.erl
index 803efb2..95e36cd 100644
--- a/apps/egara/src/egara_notifications_receiver.erl
+++ b/apps/egara/src/egara_notifications_receiver.erl
@@ -1,132 +1,140 @@
%% Copyright 2014 Kolab Systems AG (http://www.kolabsys.com)
%%
%% Aaron Seigo (Kolab Systems) <seigo a kolabsys.com>
%%
%% This program is free software: you can redistribute it and/or modify
%% it under the terms of the GNU General Public License as published by
%% the Free Software Foundation, either version 3 of the License, or
%% (at your option) any later version.
%%
%% This program is distributed in the hope that it will be useful,
%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
%% GNU General Public License for more details.
%%
%% You should have received a copy of the GNU General Public License
%% along with this program. If not, see <http://www.gnu.org/licenses/>.
-module(egara_notifications_receiver).
-behaviour(gen_server).
-include_lib("kernel/include/file.hrl").
%% API
-export([ start_link/0
, notification_received/1
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, { storage_id = 0 }).
-define(PF_LOCAL, 1).
-define(SOCK_DGRAM, 2).
-define(UNIX_PATH_MAX, 108).
-define(MAX_SLEEP_MS, 500).
-define(MIN_SLEEP_MS, 1).
%% API
start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
notification_received(Notification) -> gen_server:cast(?MODULE, { notification, Notification }).
%% gen_server callbacks
init([]) ->
+ spawn(fun() -> recvNotifications() end),
+ MaxKey = egara_notification_store:max_key(),
+ { ok, #state{ storage_id = MaxKey + 1 } }.
+
+%% silly little thing that grabs the second-to-last item; very specific to what cyrus throws at us
+%% TODO: could be made more efficient by just going N items in since we "know" the format?
+cherryPickNotification([], LastTerm, _) ->
+ LastTerm;
+cherryPickNotification(Terms, _, ThisTerm) ->
+ [H|T] = Terms,
+ cherryPickNotification(T, ThisTerm, H).
+cherryPickNotification(Terms) ->
+ [H|T] = Terms,
+ cherryPickNotification(T, null, H).
+
+recvNotifications() ->
+ case application:get_env(imap_server) of
+ "cyrus" -> recvCyrusNotifications();
+ _ -> recvCyrusNotifications()
+ end.
+
+recvCyrusNotifications() ->
%% get the path to the listen socket, either from the app config
%% or here
case application:get_env(notification_socket_path) of
Value when is_binary(Value) -> SocketPath = Value;
_ -> SocketPath = <<"/tmp/egara-notify">>
end,
%% see if the file exists, and if it does, remove it if it is a socket
%% allows to start the application multiple times, which is a good thing
case file:read_file_info(SocketPath) of
{ ok, #file_info{ type = other } } -> ok = file:delete(SocketPath); %% TODO: handle errror with a report
{ ok, _ } -> notok; %% do not remove a non-socket file. TODO: handle errror with a report, clean exit
{error, _} -> ok
end,
{ ok, Socket } = procket:socket(?PF_LOCAL, ?SOCK_DGRAM, 0),
Sun = <<?PF_LOCAL:16/native, % sun_family
SocketPath/binary, % address
0:((?UNIX_PATH_MAX-byte_size(SocketPath))*8) %% zero out the rest
>>,
case procket:bind(Socket, Sun) of
- ok -> spawn(fun() -> recvNotification(Socket, ?MIN_SLEEP_MS) end);
+ ok -> recvCyrusNotification(Socket, ?MAX_SLEEP_MS);
{ error, PosixError } -> lager:error("Could not bind to notification socket; error is: ~p", [PosixError])
- end,
-
- MaxKey = egara_notification_store:max_key(),
- { ok, #state{ storage_id = MaxKey + 1 } }.
-
-%% silly little thing that grabs the second-to-last item; very specific to what cyrus throws at us
-%% TODO: could be made more efficient by just going N items in since we "know" the format?
-cherryPickNotification([], LastTerm, _) ->
- LastTerm;
-cherryPickNotification(Terms, _, ThisTerm) ->
- [H|T] = Terms,
- cherryPickNotification(T, ThisTerm, H).
-cherryPickNotification(Terms) ->
- [H|T] = Terms,
- cherryPickNotification(T, null, H).
+ end.
-recvNotification(Socket, SleepMs) ->
+recvCyrusNotification(Socket, SleepMs) ->
case procket:recvfrom(Socket, 16#FFFF) of
{ error, eagain } ->
NewSleepMs = min(SleepMs * 2, ?MAX_SLEEP_MS),
timer:sleep(NewSleepMs),
- recvNotification(Socket, NewSleepMs);
+ recvCyrusNotification(Socket, NewSleepMs);
{ ok, Buf } ->
%%lager:info("~s", [binary_to_list(Buf)]),
Components = binary:split(Buf, <<"\0">>, [global]),
%%lager:info("~p", [Components]),
Json = cherryPickNotification(Components),
gen_server:cast(?MODULE, { notification, Json }),
- recvNotification(Socket, ?MIN_SLEEP_MS)
+ recvCyrusNotification(Socket, ?MIN_SLEEP_MS)
end.
handle_call(_, _From, State) ->
{ reply, ok, State }.
handle_cast({ notification, Notification }, State) when is_binary(Notification) ->
try jsx:decode(Notification) of
Term -> egara_notification_store:add(State#state.storage_id, Term),
{ noreply, State#state{ storage_id = State#state.storage_id + 1 } } %% if paralellized, this needs to be syncronized
catch
error:_ -> { noreply, State }
end;
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%% Upgrade from 2
code_change(_OldVsn, State, [from1To2]) ->
error_logger:info_msg("CODE_CHANGE from 2~n"),
{ state, StorageId } = State,
NewState = #state{ storage_id = StorageId },
{ ok, NewState }.
%% Note downgrade code_change not implemented
%%% Internal functions
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Apr 6, 12:29 AM (1 w, 2 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18831716
Default Alt Text
(5 KB)
Attached To
Mode
rE egara
Attached
Detach File
Event Timeline