diff --git a/apps/egara/src/egara_ldap.erl b/apps/egara/src/egara_ldap.erl index b906cc7..6ee5734 100644 --- a/apps/egara/src/egara_ldap.erl +++ b/apps/egara/src/egara_ldap.erl @@ -1,148 +1,148 @@ %% Copyright 2014 Kolab Systems AG (http://www.kolabsys.com) %% %% Aaron Seigo (Kolab Systems) %% %% This program is free software: you can redistribute it and/or modify %% it under the terms of the GNU General Public License as published by %% the Free Software Foundation, either version 3 of the License, or %% (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU General Public License %% along with this program. If not, see . -module(egara_ldap). -behaviour(gen_server). -include_lib("eldap/include/eldap.hrl"). %% API -export([ start_link/1, fetch_userdata_for_login/2 ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% state record definition %%TODO: support reconfiguration requests -record(state, { base_dn, ldap_connection = none }). %% public API start_link(Args) -> gen_server:start_link(?MODULE, Args, []). fetch_userdata_for_login(Pid, UserLogin) -> gen_server:call(Pid, { fetch_userdata, UserLogin }). %% gen_server API init(_Args) -> process_flag(trap_exit, true), State = #state{ base_dn = proplists:get_value(base_dn, application:get_env(egara, ldap, []), [])}, { ok, State }. handle_call({ fetch_userdata, UserLogin } , From, State) when is_list(UserLogin) -> - handle_call({ store_userdata, erlang:list_to_binary(UserLogin) }, From, State); + handle_call({ store_userdata, egara_utils:as_binary(UserLogin) }, From, State); handle_call({ fetch_userdata, UserLogin }, _From, State) -> %%TODO perhaps call with continuation to avoid create new state everytime? NewState = ensure_connected(State), { reply, query_userdata(UserLogin, NewState), NewState }; handle_call(_Request, _From, State) -> { reply, ok, State }. handle_cast(_Msg, State) -> { noreply, State }. handle_info({'EXIT', _ParentPid, shutdown}, _State) -> exit(shutdown); handle_info({'EXIT', From, _Reason}, State) -> %% look out for our ldap connection dropping if From =:= State#state.ldap_connection -> lager:warning("Just lost our ldap connection..."), { noreply, State#state{ ldap_connection = none } }; true -> { noreply, State } end; handle_info(_Info, State) -> { noreply, State }. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> { ok, State }. %% private API %% LDAP CONNECTION ROUTINES ensure_connected(#state{ ldap_connection = none } = State) -> %%TODO eldap:start_tls? LDAPConfig = application:get_env(egara, ldap, []), connect_with_config(proplists:get_value(hosts, LDAPConfig), proplists:get_value(port, LDAPConfig), proplists:get_value(bind_dn, LDAPConfig), proplists:get_value(bind_pw, LDAPConfig), State); ensure_connected(State) -> State. connect_with_config(Hosts, Port, BindDn, BindPw, State) when is_list(Hosts), is_integer(Port), is_list(BindDn), is_list(BindPw) -> ldap_connection_attempt(eldap:open(Hosts, [ { port, Port } ]), BindDn, BindPw, State); connect_with_config(Hosts, _Port, BindDn, BindPw, State) when is_list(Hosts), is_list(BindDn), is_list(BindPw) -> ldap_connection_attempt(eldap:open(Hosts), BindDn, BindPw, State); connect_with_config(_Hosts, _Port, _BindDn, _BindPw, State) -> lager:warning("LDAP configuration is broken!"), State. ldap_connection_attempt({ ok, Handle }, BindDn, BindPw, State) -> ldap_bind_attempt(eldap:simple_bind(Handle, BindDn, BindPw), Handle, State); ldap_connection_attempt({ error, Reason }, _BindDn, _BindPw, State) -> lager:error("LDAP: Could not connect to server: ~p", [Reason]), State. ldap_bind_attempt(ok, Handle, State) -> link(Handle), State#state{ ldap_connection = Handle }; ldap_bind_attempt({ error, Reason }, Handle, State) -> lager:error("LDAP: Could not authenticate to server: ~p", [Reason]), eldap:close(Handle), State#state{ ldap_connection = none }. %% QUERY FOR USER DATA query_userdata(_UserLogin, #state{ ldap_connection = none }) -> notfound; query_userdata(UserLogin, State) -> UserBaseDn = "ou=People," ++ State#state.base_dn, Attrs = ["cn", "nsuniqueid"], %%TODO nsuniqueid configurable? UserLoginString = erlang:binary_to_list(UserLogin), UserAuthAttrs = ["mail", "alias", "uid"], %%TODO: configurable? ObjectClassFilter = eldap:equalityMatch("objectclass", "inetorgperson"), UserFilter = eldap:'or'(lists:foldl(fun(Attr, Acc) -> [ eldap:equalityMatch(Attr, UserLoginString) | Acc ] end, [], UserAuthAttrs)), Filter = eldap:'and'([UserFilter, ObjectClassFilter]), SearchOptions = [{ base, UserBaseDn }, { filter, Filter }, { attributes, Attrs } ], %%lager:info("OUR FILTERS ARE ~p", [SearchOptions]), LDAPResponse = eldap:search(State#state.ldap_connection, SearchOptions), create_userdata_query_response(UserLogin, LDAPResponse). create_userdata_query_response(UserLogin, { ok, #eldap_search_result{ entries = [] } }) -> lager:warning("LDAP: Could not find requested user ~p", [UserLogin]), notfound; create_userdata_query_response(_UserLogin, { ok, #eldap_search_result{ entries = [ Entry | _ ] } }) -> %%TODO: if we get more than one match? LDAPAttributes = Entry#eldap_entry.attributes, %% TODO should it really be exporting ldap-specifics like "dn" and "cn"? [ - { <<"id">>, list_to_binary(hd(proplists:get_value("nsuniqueid", LDAPAttributes, ""))) }, - { <<"cn">>, list_to_binary(hd(proplists:get_value("cn", LDAPAttributes, ""))) }, - { <<"dn">>, list_to_binary(Entry#eldap_entry.object_name) } + { <<"id">>, egara_utils:as_binary(hd(proplists:get_value("nsuniqueid", LDAPAttributes, ""))) }, + { <<"cn">>, egara_utils:as_binary(hd(proplists:get_value("cn", LDAPAttributes, ""))) }, + { <<"dn">>, egara_utils:as_binary(Entry#eldap_entry.object_name) } ]; create_userdata_query_response(UserLogin, { error, Reason }) -> lager:warning("LDAP: Could not find requested user ~p, reason: ~p", [UserLogin, Reason]), notfound; create_userdata_query_response(_UserLogin, _) -> lager:warning("LDAP: unknown error on user query"), notfound. diff --git a/apps/egara/src/egara_riak_config.erl b/apps/egara/src/egara_riak_config.erl index 2216ade..2110145 100644 --- a/apps/egara/src/egara_riak_config.erl +++ b/apps/egara/src/egara_riak_config.erl @@ -1,91 +1,91 @@ %% Copyright 2014 Kolab Systems AG (http://www.kolabsys.com) %% %% Aaron Seigo (Kolab Systems) %% %% This program is free software: you can redistribute it and/or modify %% it under the terms of the GNU General Public License as published by %% the Free Software Foundation, either version 3 of the License, or %% (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU General Public License %% along with this program. If not, see . -module(egara_riak_config). -behaviour(gen_server). %% API -export([start_link/0, connection_params/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% state record definition -record(state, { pos = 1, nodes }). %% public API start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). connection_params() -> gen_server:call(?MODULE, connection_params). %% gen_server API init(_Args) -> Config = application:get_env(egara, riak, []), Nodes = sanitize_node_config(proplists:get_value(nodes, Config, missing)), State = #state{ nodes = Nodes }, { Host, Port } = erlang:hd(Nodes), { ok, Connection } = riakc_pb_socket:start_link(Host, Port), BucketTypes = proplists:get_value(bucket_types, Config, []), setup_bucket_types(Connection, BucketTypes), riakc_pb_socket:stop(Connection), { ok, State }. handle_call(connection_params, _From, State) -> next_node(State); handle_call(_, _, State) -> { reply, ok, State}. handle_cast(_Msg, State) -> { noreply, State }. handle_info(_Info, State) -> { noreply, State }. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> { ok, State }. %% private API sanitize_node_config(List) when is_list(List) -> Sanitizer = fun({ Host, Port } = Tuple, Acc) when is_list(Host), is_integer(Port) -> [ Tuple | Acc ]; (_, Acc) -> Acc end, case lists:foldl(Sanitizer, [], List) of [] -> sanitize_node_config(missing); Config -> Config end; sanitize_node_config(missing) -> [ { "localhost", 10017 } ]. next_node(#state{ pos = Pos, nodes = Nodes } = State) when Pos > length(State#state.nodes) -> [ Tuple | _ ] = Nodes, { reply, Tuple, State#state{ pos = 2 } }; next_node(#state{ pos = Pos, nodes = Nodes } = State) -> Tuple = lists:nth(Pos, Nodes), { reply, Tuple, State#state{ pos = Pos + 1 } }. %%TODO: current almost entirely useless since the API does not allow creating new bucket types setup_bucket_types(Connection, [{ BucketList, Properties } | Tail]) -> - Bucket = list_to_binary(BucketList), + Bucket = egara_utils:as_binary(BucketList), CurrentTypeProperties = riakc_pb_socket:get_bucket_type(Connection, Bucket), case CurrentTypeProperties of { ok, _ } -> %% TODO check that the properties actually match up so as not to set the type up EVERY time riakc_pb_socket:set_bucket_type(Connection, Bucket, Properties); { error, _ } -> lager:info("Missing Riak bucket type ~p, intended to have properties ~p", [Bucket, CurrentTypeProperties]), riakc_pb_socket:set_bucket_type(Connection, Bucket, Properties) end, setup_bucket_types(Connection, Tail); setup_bucket_types(_Connection, []) -> ok. diff --git a/apps/egara/src/egara_storage.erl b/apps/egara/src/egara_storage.erl index b1374c7..6cbb06e 100644 --- a/apps/egara/src/egara_storage.erl +++ b/apps/egara/src/egara_storage.erl @@ -1,182 +1,182 @@ %% Copyright 2014 Kolab Systems AG (http://www.kolabsys.com) %% %% Aaron Seigo (Kolab Systems) %% %% This program is free software: you can redistribute it and/or modify %% it under the terms of the GNU General Public License as published by %% the Free Software Foundation, either version 3 of the License, or %% (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU General Public License %% along with this program. If not, see . -module(egara_storage). -behaviour(gen_server). %% API -export([ start_link/0, store_notification/3, store_userdata/3, store_message_history_entry/3, fetch_userdata_for_login/2, store_folder_uid/3, fetch_folder_uid/2 ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% state record definition -record(state, { riak_connection = none }). %% public API start_link() -> gen_server:start_link(?MODULE, [], []). store_notification(Pid, Key, Notification) -> gen_server:call(Pid, { store_notification, Key, Notification }). store_userdata(Pid, UserLogin, UserData) -> gen_server:call(Pid, { store_userdata, UserLogin, UserData }). fetch_userdata_for_login(Pid, UserLogin) -> gen_server:call(Pid, { fetch_userdata, UserLogin }). store_folder_uid(Pid, Folder, UID) when is_binary(Folder), is_binary(UID) -> gen_server:call(Pid, { store_folder_uid, Folder, UID }). fetch_folder_uid(Pid, Folder) when is_binary(Folder) -> gen_server:call(Pid, { fetch_folder_uid, Folder }). store_message_history_entry(Pid, Key, Value) when is_binary(Key), is_binary(Value) -> gen_server:call(Pid, { store_message_history_entry, Key, Value }). %% gen_server API init(_) -> erlang:process_flag(trap_exit, true), { ok, #state {} }. handle_call({ store_notification, Keys, Notification }, _From, State) when is_list(Keys) -> lager:info("Notification----> ~p = ~p", [Keys, Notification]), NewState = ensure_connected(State), Json = jsx:encode(Notification), store_notification_json(Keys, Json, NewState, {}); handle_call({ store_notification, Key, Notification }, _From, State) when is_binary(Key) -> lager:info("Notification----> ~p = ~p", [Key, Notification]), NewState = ensure_connected(State), Json = jsx:encode(Notification), store_notification_json(Key, Json, NewState); handle_call({ store_userdata, UserLogin, UserData }, From, State) when is_list(UserLogin) -> - handle_call({ store_userdata, erlang:list_to_binary(UserLogin), UserData }, From, State); + handle_call({ store_userdata, egara_utils:as_binary(UserLogin), UserData }, From, State); handle_call({ store_userdata, UserLogin, UserData }, _From, State) -> UserId = proplists:get_value(<<"id">>, UserData, ""), TS = egara_utils:current_timestamp(), Key = <>, Json = jsx:encode([ { <<"user">>, UserLogin } | UserData ]), Storable = riakc_obj:new(historical_users_bucket(), Key, Json, json_type()), NewState = ensure_connected(State), case riakc_pb_socket:put(NewState#state.riak_connection, Storable) of ok -> CurrentStorable = riakc_obj:new(current_users_bucket(), UserLogin, Json, json_type()), case riakc_pb_socket:put(NewState#state.riak_connection, CurrentStorable) of ok -> { reply, ok, NewState }; Rv -> lager:warning("Failed to store current user data: ~p", [Rv]), { reply, error, NewState } end; Rv -> lager:warning("Failed to store user data: ~p", [Rv]), { reply, error, NewState } end; handle_call({ fetch_userdata, UserLogin }, From, State) when is_list(UserLogin) -> - handle_call({ store_userdata, erlang:list_to_binary(UserLogin) }, From, State); + handle_call({ store_userdata, egara_utils:as_binary(UserLogin) }, From, State); handle_call({ fetch_userdata, UserLogin }, _From, State) when is_binary(UserLogin) -> NewState = ensure_connected(State), RiakResponse = riakc_pb_socket:get(NewState#state.riak_connection, current_users_bucket(), UserLogin), Response = case RiakResponse of { ok, Obj } -> Value = riakc_obj:get_value(Obj), try jsx:decode(Value) of Term -> Term catch error:_ -> notfound end; _ -> notfound end, { reply, Response, NewState }; handle_call({ store_folder_uid, Folder, UID }, _From, State) when is_binary(Folder), is_binary(UID) -> NewState = ensure_connected(State), Storable = riakc_obj:new(current_folders_bucket(), Folder, UID), Rv = riakc_pb_socket:put(NewState#state.riak_connection, Storable), { reply, Rv, NewState }; handle_call({ fetch_folder_uid, Folder }, From, State) when is_list(Folder) -> - handle_call({ fetch_folder_uid, list_to_binary(Folder) }, From, State); + handle_call({ fetch_folder_uid, egara_utils:as_binary(Folder) }, From, State); handle_call({ fetch_folder_uid, Folder }, _From, State) when is_binary(Folder) -> NewState = ensure_connected(State), RiakResponse = riakc_pb_socket:get(NewState#state.riak_connection, current_folders_bucket(), Folder), Response = case RiakResponse of { ok, Obj } -> riakc_obj:get_value(Obj); _ -> notfound end, { reply, Response, NewState }; handle_call({ store_message_history_entry, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) -> %%lager:info("Storing history entry ~p => ~p", [Key, Value]), NewState = ensure_connected(State), Storable = riakc_obj:new(message_timeline_bucket(), Key, Value), Rv = riakc_pb_socket:put(NewState#state.riak_connection, Storable), { reply, Rv, NewState }; handle_call(_Request, _From, State) -> { reply, ok, State }. handle_cast(_Msg, State) -> { noreply, State }. handle_info({'EXIT', _ParentPid, shutdown}, _State) -> exit(shutdown); handle_info({'EXIT', From, _Reason}, State) -> %% look out for our riak connection dropping if From =:= State#state.riak_connection -> lager:warning("Just lost our riak connection..."), { noreply, State#state{ riak_connection = none } }; true -> { noreply, State } end; handle_info(_Info, State) -> { noreply, State }. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> { ok, State }. %% private API ensure_connected(#state{ riak_connection = none } = State) -> { Host, Port } = egara_riak_config:connection_params(), %%lager:info("Going to try with ... ~p ~p ~p", [Host, Port, State#state.riak_connection]), case riakc_pb_socket:start_link(Host, Port) of { ok, Connection } -> State#state{ riak_connection = Connection }; Actual -> lager:warning("COULD NOT CONNECT TO RIAK! Reason: ~p", [Actual]), State end; ensure_connected(State) -> State. store_notification_json([], _Json, _State, Reply) -> Reply; store_notification_json([Key|Keys], Json, State, _ReplyPlaceholder) -> Reply = store_notification_json(Key, Json, State), store_notification_json(Keys, Json, State, Reply). store_notification_json(Key, Json, State) when is_binary(Key) -> %%lager:info("Going to store ~p", [Key]), Storable = riakc_obj:new(notification_bucket(), Key, Json, json_type()), case riakc_pb_socket:put(State#state.riak_connection, Storable) of ok -> { reply, ok, State }; Rv -> lager:warning("Failed to put notification: ~p", [Rv]), { reply, error, State } end. historical_users_bucket() -> { <<"egara-lww">>, <<"users">> }. current_users_bucket() -> { <<"egara-unique">>, <<"users-current">> }. notification_bucket() -> { <<"egara-lww">>, <<"imap-events">> }. %%historical_folders_bucket() -> { <<"egara-lww">>, <<"imap-folders">> }. current_folders_bucket() -> { <<"egara-unique">>, <<"imap-folders-current">> }. message_timeline_bucket() -> { <<"egara-lww">>, <<"imap-message-timeline">> }. json_type() -> "application/json". diff --git a/apps/egara/src/incoming_handlers/egara_incoming_cyrus_imap.erl b/apps/egara/src/incoming_handlers/egara_incoming_cyrus_imap.erl index 0caab08..e7392f6 100644 --- a/apps/egara/src/incoming_handlers/egara_incoming_cyrus_imap.erl +++ b/apps/egara/src/incoming_handlers/egara_incoming_cyrus_imap.erl @@ -1,136 +1,136 @@ %% Copyright 2014 Kolab Systems AG (http://www.kolabsys.com) %% %% Aaron Seigo (Kolab Systems) %% %% This program is free software: you can redistribute it and/or modify %% it under the terms of the GNU General Public License as published by %% the Free Software Foundation, either version 3 of the License, or %% (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU General Public License %% along with this program. If not, see . -module(egara_incoming_cyrus_imap). -behavior(egara_incoming_handler). %% API -export([start_reception/0, launchRecvCyrusNotification/1, recvCyrusNotification/3]). -include_lib("procket/include/procket.hrl"). -include_lib("kernel/include/file.hrl"). start_reception() -> %% get the path to the listen socket, either from the app config or here DefaultSocketPath = "/var/lib/imap/socket/notify", case application:get_env(egara, cyrus) of { ok, Config } when is_list(Config) -> SocketPath = proplists:get_value(notification_socket_path, Config, DefaultSocketPath); _ -> SocketPath = DefaultSocketPath end, lager:info("Listening for cyrus-imap events on socket ~p", [SocketPath]), %% see if the file exists, and if it does, remove it if it is a socket %% allows to start the application multiple times, which is a good thing CleanUp = case file:read_file_info(SocketPath) of { ok, #file_info{ type = other } } -> ok = file:delete(SocketPath); %% TODO: handle errror with a report { ok, _ } -> error; %% do not remove a non-socket file. TODO: handle errror with a report, clean exit { error, _ } -> ok end, - if CleanUp =:= ok -> bindSocket(list_to_binary(SocketPath)), ok; + if CleanUp =:= ok -> bindSocket(egara_utils:as_binary(SocketPath)), ok; true -> error end. bindSocket(SocketPath) when is_binary(SocketPath) -> inert:start(), { ok, Socket } = procket:socket(?PF_LOCAL, ?SOCK_DGRAM, 0), Sun = <>, case procket:bind(Socket, Sun) of ok -> file:change_mode(SocketPath, 8#777), %% FIXME: tighten these permissions down? at least complain on error spawn(fun() -> supervisor:start_child(egara_sup, { ?MODULE, { ?MODULE, launchRecvCyrusNotification, [Socket] }, permanent, 5000, worker, [?MODULE]}) end), ok; { error, PosixError } -> lager:error("Could not bind to notification socket; error is: ~p", [PosixError]), error end. %% silly little thing that grabs the second-to-last item; very specific to what cyrus throws at us %% TODO: could be made more efficient by just going N items in since we "know" the format? cherryPickNotification([], LastTerm, _) -> LastTerm; cherryPickNotification(Terms, _, ThisTerm) -> [H|T] = Terms, cherryPickNotification(T, ThisTerm, H). cherryPickNotification(Terms) -> [H|T] = Terms, cherryPickNotification(T, null, H). launchRecvCyrusNotification(Socket) -> % this mapping gets passed to proplists:expand to translate/normalize event names EventMappings = [ { { <<"event">>, <<"vnd.cmu.MessageCopy">> }, { <<"event">>, <<"MessageCopy">>} }, { { <<"event">>, <<"vnd.cmu.MessageMove">> }, { <<"event">>, <<"MessageMove">>} } ], { ok, spawn_link(?MODULE, recvCyrusNotification, [Socket, EventMappings, none]) }. recvCyrusNotification(Socket, EventMappings, JsonContinuation) -> { ok, read } = inert:poll(Socket), case procket:recvfrom(Socket, 16#FFFF) of { error, eagain } -> timer:sleep(100), recvCyrusNotification(Socket, EventMappings, JsonContinuation); { ok, Buf } -> Components = binary:split(Buf, <<"\0">>, [global]), %%lager:info("~p", [Components]), Json = cherryPickNotification(Components), NewJsonContinuation = decode(Json, EventMappings, JsonContinuation), recvCyrusNotification(Socket, EventMappings, NewJsonContinuation) end. %% returns the next continuation, or none if .. well .. none decode(Json, EventMappings, none) -> try jsx:decode(Json, [stream]) of { incomplete, F } -> check_complete(F, EventMappings); Term -> notification_received(Term, EventMappings), none catch error:_ -> none end; decode(Json, EventMappings, Continuation) -> try Continuation(Json) of { incomplete, F } -> check_complete(F, EventMappings); Term -> notification_received(Term, EventMappings), none catch error:_ -> none end. check_complete(Continuation, EventMappings) -> try Continuation(end_stream) of Term -> notification_received(Term, EventMappings), none catch error:_ -> Continuation end. notification_received(Term, EventMappings) -> WithUtcTimestamp = addUtcTimestamp(Term, proplists:get_value(<<"timestamp">>, Term)), egara_notifications_receiver:notification_received(proplists:expand(EventMappings, WithUtcTimestamp)). addUtcTimestamp(Term, undefined) -> Timestamp = egara_utils:current_timestamp(), [{ <<"timestamp_utc">>, Timestamp }|[{ <<"timestamp">>, Timestamp }|Term]]; addUtcTimestamp(Term, Timestamp) -> UtcTimestamp = egara_utils:normalize_timestamp(Timestamp), [{ <<"timestamp_utc">>, UtcTimestamp }|Term]. diff --git a/apps/egara/src/lib/egara_notification_queue.erl b/apps/egara/src/lib/egara_notification_queue.erl index 261b61c..4b4e17e 100644 --- a/apps/egara/src/lib/egara_notification_queue.erl +++ b/apps/egara/src/lib/egara_notification_queue.erl @@ -1,206 +1,206 @@ %% Copyright 2014 Kolab Systems AG (http://www.kolabsys.com) %% %% Aaron Seigo (Kolab Systems) %% %% This program is free software: you can redistribute it and/or modify %% it under the terms of the GNU General Public License as published by %% the Free Software Foundation, either version 3 of the License, or %% (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU General Public License %% along with this program. If not, see . -module(egara_notification_queue). -export([ install/1, start/0, notification/1, next_unassigned/0, add/2, remove/1, remove_all/0, assign_one/1, assigned_to/1, release/1, release/2, release_orphaned/0, release_all/0, migrate_failures/0, max_key/0, list/0]). -include_lib("stdlib/include/qlc.hrl"). -record(egara_incoming_notification, { id, node = undefined, claimed_by = undefined, fails = 0, term = undefined }). -record(egara_incoming_notification_fails, { id, node, term }). -define(MAX_FAILURES, 5). install(Nodes) -> rpc:multicall(Nodes, application, stop, [mnesia]), %% TODO: tune mnesia for large batches of writes; see -> http://streamhacker.com/2008/12/10/how-to-eliminate-mnesia-overload-events/ mnesia:create_schema(Nodes), rpc:multicall(Nodes, application, start, [mnesia]), %% create index for claimed_by try mnesia:create_table(egara_incoming_notification, [{ attributes, record_info(fields, egara_incoming_notification) }, { type, ordered_set }, { disc_copies, Nodes }]), mnesia:create_table(egara_incoming_notification_fails, [{ attributes, record_info(fields, egara_incoming_notification_fails) }, { disc_copies, Nodes }]) of _ -> ok catch error:_ -> ok end. start() -> mnesia:wait_for_tables([egara_incoming_notification], 5000). add(Key, Term) -> %lager:debug("Adding notification to the queue. Key: ~p => Value: ~p", [Key, Term]), F = fun() -> Rec = #egara_incoming_notification{ id = Key, term = Term }, mnesia:write(Rec) end, mnesia:activity(transaction, F). remove(Key) -> F = fun() -> try mnesia:delete({ egara_incoming_notification, Key }) of _ -> ok catch error:Error -> {error, caught, Error} end end, mnesia:activity(transaction, F). remove_all() -> mnesia:clear_table(egara_incoming_notification). notification(Key) -> F = fun() -> case mnesia:read(egara_incoming_notification, Key) of [Record] -> Record; [] -> #egara_incoming_notification{ id = 0 } end end, mnesia:activity(transaction, F). release(Key, ProcessKey) -> %%TODO: make this more efficient by using qlc; currently scans whole table! Pattern = #egara_incoming_notification{ _ = '_', claimed_by = ProcessKey, id = Key }, F = fun() -> %% check if claimed_by is set and if so if the process is still running case mnesia:match_object(Pattern) of [#egara_incoming_notification{ term = Term }] -> mnesia:write(#egara_incoming_notification{ id = Key, term = Term, claimed_by = undefined }); [] -> { error, notfound} end end, mnesia:activity(transaction, F). release(Key) -> F = fun() -> case mnesia:read(egara_incoming_notification, Key) of [#egara_incoming_notification{ term = Term }] -> mnesia:write(#egara_incoming_notification{ id = Key, term = Term, claimed_by = undefined }), ok; [] -> false end end, mnesia:activity(transaction, F). release_orphaned() -> F = fun() -> QH = qlc:q([ Record || #egara_incoming_notification{ claimed_by = ProcessKey } = Record <- mnesia:table(egara_incoming_notification), ProcessKey =/= undefined, syn:find_by_key(ProcessKey) =:= undefined ]), qlc:fold(fun(Record, N) -> Failed = Record#egara_incoming_notification.fails + 1, mnesia:write(Record#egara_incoming_notification{ claimed_by = undefined, fails = Failed }), N + 1 end, 0, QH) end, mnesia:activity(transaction, F). migrate_failures() -> F = fun() -> Timestamp = egara_utils:current_timestamp(), QH = qlc:q([ Record || #egara_incoming_notification{ fails = Fails } = Record <- mnesia:table(egara_incoming_notification), Fails >= ?MAX_FAILURES ]), qlc:fold(fun(#egara_incoming_notification{ id = Key, node = Node, term = Term }, N) -> - IntBin = integer_to_binary(N), + IntBin = egara_utils:as_binary(N), mnesia:write(#egara_incoming_notification_fails{ id = <>, node = Node, term = Term }), remove(Key), N + 1 end, 0, QH) end, mnesia:activity(transaction, F). release_all() -> F = fun() -> QH = qlc:q([ Rec|| Rec <- mnesia:table(egara_incoming_notification), Rec#egara_incoming_notification.claimed_by =/= undefined ]), qlc:fold(fun(Rec, N) -> mnesia:write(Rec#egara_incoming_notification{ claimed_by = undefined }), N + 1 end, 0, QH) end, mnesia:activity(transaction, F). assigned_to(Key) -> F = fun() -> QH = qlc:q([ Rec || Rec <- mnesia:table(egara_incoming_notification), Rec#egara_incoming_notification.id =:= Key]), QC = qlc:cursor(QH), Answers = qlc:next_answers(QC, 1), case Answers of [Record|_] -> Record#egara_incoming_notification.claimed_by; _ -> notfound end end, mnesia:activity(transaction, F). assign_one(ProcessKey) -> F = fun() -> QH = qlc:q([ Rec || Rec <- mnesia:table(egara_incoming_notification), Rec#egara_incoming_notification.claimed_by =:= undefined, Rec#egara_incoming_notification.fails < ?MAX_FAILURES]), QC = qlc:cursor(QH), Answers = qlc:next_answers(QC, 1), case Answers of [Record|_] -> mnesia:write(Record#egara_incoming_notification{ claimed_by = ProcessKey }), incoming_as_ext_tuple(Record); _ -> notfound end end, mnesia:activity(transaction, F). list() -> F = fun() -> QH = qlc:q([ Rec || Rec <- mnesia:table(egara_incoming_notification) ]), QC = qlc:cursor(QH), qlc:next_answers(QC) end, mnesia:activity(transaction, F). incoming_as_ext_tuple(Record) -> { Record#egara_incoming_notification.id, Record#egara_incoming_notification.node, Record#egara_incoming_notification.term }. next_unassigned([]) -> none; next_unassigned([H|_]) -> H#egara_incoming_notification.id; next_unassigned(_) -> error. next_unassigned() -> F = fun() -> QH = qlc:q([ Rec || Rec <- mnesia:table(egara_incoming_notification), Rec#egara_incoming_notification.claimed_by =:= undefined ]), QC = qlc:cursor(QH), Answers = qlc:next_answers(QC, 1), next_unassigned(Answers) end, mnesia:activity(transaction, F). max_key() -> F = fun() -> case mnesia:last(egara_incoming_notification) of '$end_of_table' -> 0; Key -> Key end end, mnesia:activity(transaction, F). diff --git a/apps/egara/src/lib/egara_utils.erl b/apps/egara/src/lib/egara_utils.erl index 3a7a137..8737a9d 100644 --- a/apps/egara/src/lib/egara_utils.erl +++ b/apps/egara/src/lib/egara_utils.erl @@ -1,25 +1,27 @@ %% Copyright 2014 Kolab Systems AG (http://www.kolabsys.com) %% %% Aaron Seigo (Kolab Systems) %% %% This program is free software: you can redistribute it and/or modify %% it under the terms of the GNU General Public License as published by %% the Free Software Foundation, either version 3 of the License, or %% (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU General Public License %% along with this program. If not, see . -module(egara_utils). -export([ current_timestamp/0, normalize_timestamp/1, as_binary/1 ]). current_timestamp() -> iso8601:format(os:timestamp()). normalize_timestamp(Timestamp) -> iso8601:format(iso8601:parse_exact(Timestamp)). as_binary(Value) when is_binary(Value) -> Value; -as_binary(Value) when is_list(Value) -> erlang:list_to_binary(Value). +as_binary(Value) when is_list(Value) -> erlang:list_to_binary(Value); +as_binary(Value) when is_integer(Value) -> erlang:integer_to_binary(Value); +as_binary(Value) when is_float(Value) -> erlang:float_to_binary(Value).