Page MenuHomePhorge

No OneTemporary

Authored By
Unknown
Size
6 KB
Referenced Files
None
Subscribers
None
diff --git a/apps/egara/src/egara_worker.erl b/apps/egara/src/egara_worker.erl
index d2d0e9a..78163fd 100644
--- a/apps/egara/src/egara_worker.erl
+++ b/apps/egara/src/egara_worker.erl
@@ -1,149 +1,154 @@
%% Copyright 2014 Kolab Systems AG (http://www.kolabsys.com)
%%
%% Aaron Seigo (Kolab Systems) <seigo a kolabsys.com>
%%
%% This program is free software: you can redistribute it and/or modify
%% it under the terms of the GNU General Public License as published by
%% the Free Software Foundation, either version 3 of the License, or
%% (at your option) any later version.
%%
%% This program is distributed in the hope that it will be useful,
%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
%% GNU General Public License for more details.
%%
%% You should have received a copy of the GNU General Public License
%% along with this program. If not, see <http://www.gnu.org/licenses/>.
-module(egara_worker).
-behaviour(gen_server).
%% API
-export([ start_link/1
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(BATCH_SIZE, 500).
start_link(Args) -> gen_server:start_link(?MODULE, Args, []).
init(_Args) ->
%% EventMapping is a map of event types (e.g. <<"FlagsClear">>) to the type of
%% event (e.g. imap_mesage_event) for routing the notifications through the handler
EventMapping = transform_events_config_to_map(application:get_env(events_to_track)),
%%lager:info("We gots us ... ~p", [EventMapping]),
{ ok, EventMapping }.
handle_call(_Request, _From, State) ->
{ reply, ok, State }.
handle_cast(process_events, State) ->
case poolboy:checkout(egara_storage_pool, false, 10) of
Storage when is_pid(Storage) ->
%%lager:info("Storing using ~p", [Storage]),
process_as_many_events_as_possible(Storage, State, ?BATCH_SIZE),
poolboy:checkin(egara_storage_pool, Storage);
_ ->
lager:warning("Unable to get storage!")
end,
{ noreply, State };
handle_cast(_Msg, State) ->
{ noreply, State }.
handle_info(_Info, State) ->
{ noreply, State }.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ ok, State }.
%% private API
add_events_to_map(Type, Events, EventMap) when is_list(Events) ->
lists:foldl(fun(Event, Map) -> maps:put(Event, Type, Map) end, EventMap, Events);
add_events_to_map(_Type, _Events, EventMap) ->
EventMap.
transform_events_config_to_map({ ok, EventConfig }) ->
lists:foldl(fun({ Type, Events }, EventMap) -> add_events_to_map(Type, Events, EventMap) end, maps:new(), EventConfig);
transform_events_config_to_map(_) ->
maps:new(). %% return an empty map .. this is going to be boring
process_as_many_events_as_possible(_Storage, _EventMapping, 0) ->
egara_notifications_processor:queue_drained(),
ok;
process_as_many_events_as_possible(Storage, EventMapping, N) ->
Status = egara_notification_queue:assign_next(self()),
%%lager:info("~p is starting to process... ~p", [self(), Key]),
case notification_assigned(Storage, EventMapping, Status) of
again -> process_as_many_events_as_possible(Storage, EventMapping, N - 1);
_ -> ok
end.
notification_assigned(_Storage, _EventMapping, notfound) ->
%%lager:info("Checking in ~p", [self()]),
poolboy:checkin(egara_notification_workers, self()),
egara_notifications_processor:queue_drained(),
done;
notification_assigned(Storage, EventMapping, { Key, Notification } ) ->
EventType = proplists:get_value(<<"event">>, Notification),
EventCategory = maps:find(EventType, EventMapping),
%%lager:info("Type is ~p which maps to ~p", [EventType, EventCategory]),
case process_notification_by_category(Storage, Notification, EventCategory) of
- ok -> %%lager:info("Done with ~p", [Key]),
+ ok ->
+ %%lager:info("Done with ~p", [Key]),
+ egara_notification_queue:remove(Key),
+ again;
+ ignoring ->
+ %%lager:info("Ignoring ~p", [Key]),
egara_notification_queue:remove(Key),
again;
_ -> error
end.
process_notification_by_category(Storage, Notification, { ok, Type }) ->
%% this version, with the { ok, _ } tuple is called due to maps:find returning { ok, Value }
%% it is essentiall a forwarder to other impls below
NotificationWithUsername = ensure_username(Storage, Notification, proplists:get_value(<<"user">>, Notification)),
process_notification_by_category(Storage, NotificationWithUsername, Type);
process_notification_by_category(Storage, Notification, imap_message_event) ->
lager:info("storing an imap_message_event"),
Key = <<"TODO">>, %% TODO!
egara_storage:store_notification(Storage, Key, Notification);
process_notification_by_category(Storage, Notification, imap_mailbox_event) ->
lager:info("storing an imap_mailbox_event"),
Key = <<"TODO">>, %% TODO!
egara_storage:store_notification(Storage, Key, Notification);
process_notification_by_category(Storage, Notification, imap_session_event) ->
lager:info("storing an imap_session_event"),
Key = <<"TODO">>, %% TODO!
egara_storage:store_notification(Storage, Key, Notification);
process_notification_by_category(Storage, Notification, imap_quota_event) ->
lager:info("storing an imap_quota_event"),
Key = <<"TODO">>, %% TODO!
egara_storage:store_notification(Storage, Key, Notification);
process_notification_by_category(_Storage, _Notification, _) ->
%% in here we have a notification we don't recognize, probably because it was not configured
%% to be watched for
ignoring.
ensure_username(Storage, Notification, { ok, UserLogin } ) ->
FromStorage = egara_storage:fetch_userdata_for_login(Storage, UserLogin),
lager:info("Storage said ... ~p", [FromStorage]),
add_username_from_storage(Storage, Notification, UserLogin, FromStorage);
ensure_username(_Storage, Notification, _) ->
Notification.
add_username_from_storage(Storage, Notification, UserLogin, notfound) ->
%% TODO: LDAP worker to
FromLDAP = notfound,
add_username_from_ldap(Storage, Notification, UserLogin, FromLDAP);
add_username_from_storage(Notification, _Storage, _UserLogin, Username) ->
Notification ++ [ <<"user_id">>, Username ].
add_username_from_ldap(Notification, _Storage, _UserLogin, notfound) ->
Notification;
add_username_from_ldap(Storage, Notification, UserLogin, UserData) ->
%% TODO: storage in user butcket
egara_storage:store_userdata(Storage, UserLogin, UserData),
Notification ++ [ <<"user_id">>, UserData ].

File Metadata

Mime Type
text/x-diff
Expires
Sat, Apr 4, 5:08 AM (1 d, 22 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18822660
Default Alt Text
(6 KB)

Event Timeline