Changeset View
Changeset View
Standalone View
Standalone View
src/eimap.erl
Show First 20 Lines • Show All 140 Lines • ▼ Show 20 Lines | disconnected({ connect, Receiver, ResponseToken }, #state{ command_queue = CommandQueue, host = Host, port = Port, tls = TLS, socket = undefined } = State) -> | ||||
{ Message, ResponseType } = eimap_command_capability:new_command(parse_serverid), | { Message, ResponseType } = eimap_command_capability:new_command(parse_serverid), | ||||
Command = #command{ message = Message, response_type = ResponseType, | Command = #command{ message = Message, response_type = ResponseType, | ||||
from = SendCapabilitiesTo, response_token = { connected, Receiver, ResponseToken }, | from = SendCapabilitiesTo, response_token = { connected, Receiver, ResponseToken }, | ||||
parse_state = eimap_command_capability }, | parse_state = eimap_command_capability }, | ||||
{ next_state, wait_response, State#state { socket = Socket, tls_state = TlsState, current_command = Command, command_queue = NewCommandQueue } }; | { next_state, wait_response, State#state { socket = Socket, tls_state = TlsState, current_command = Command, command_queue = NewCommandQueue } }; | ||||
disconnected(Command, State) when is_record(Command, command) -> | disconnected(Command, State) when is_record(Command, command) -> | ||||
{ next_state, disconnected, enque_command(Command, State) }. | { next_state, disconnected, enque_command(Command, State) }. | ||||
%Throttle if the target process is getting overloaded. | |||||
%This allows the socket buffers to fill up eventually, | |||||
%so regular congestion control can kick in. | |||||
throttle(Receiver, Limit) -> | |||||
{_, MessageQueueLength} = process_info(Receiver, message_queue_len), | |||||
if | |||||
MessageQueueLength > Limit -> | |||||
SleepDuration = trunc(MessageQueueLength / 2), | |||||
lager:debug("Throttling for ~p with queue length ~p", [SleepDuration, MessageQueueLength]), | |||||
timer:sleep(SleepDuration), | |||||
throttle(Receiver, Limit); | |||||
true -> false | |||||
end. | |||||
passthrough(flush_passthrough_buffer, #state{ passthrough_send_buffer = Buffer } = State) -> | passthrough(flush_passthrough_buffer, #state{ passthrough_send_buffer = Buffer } = State) -> | ||||
%lager:info("Passing through ~p", [Buffer]), | %lager:info("Passing through ~p", [Buffer]), | ||||
passthrough({ passthrough, Buffer }, State#state{ passthrough_send_buffer = <<>> }); | passthrough({ passthrough, Buffer }, State#state{ passthrough_send_buffer = <<>> }); | ||||
passthrough({ passthrough, Data }, #state{ socket = Socket, tls_state = true } = State) -> | passthrough({ passthrough, Data }, #state{ socket = Socket, tls_state = true } = State) -> | ||||
%lager:info("Passing through ssl \"~s\"", [Data]), | %lager:info("Passing through ssl \"~s\"", [Data]), | ||||
ssl:send(Socket, deflated(Data, State)), | ssl:send(Socket, deflated(Data, State)), | ||||
{ next_state, passthrough, State }; | { next_state, passthrough, State }; | ||||
passthrough({ passthrough, Data }, #state{ socket = Socket } = State) -> | passthrough({ passthrough, Data }, #state{ socket = Socket } = State) -> | ||||
%lager:info("Passing through tcp \"~s\"", [Data]), | %lager:info("Passing through tcp \"~s\"", [Data]), | ||||
gen_tcp:send(Socket, deflated(Data, State)), | gen_tcp:send(Socket, deflated(Data, State)), | ||||
{ next_state, passthrough, State }; | { next_state, passthrough, State }; | ||||
passthrough({ data, Data }, #state{ passthrough_recv = Receiver } = State) -> | passthrough({ data, Data }, #state{ passthrough_recv = Receiver } = State) -> | ||||
%lager:info("Passing back ~p", [Data]), | %lager:info("Passing back ~p", [Data]), | ||||
throttle(Receiver, 100), | |||||
Receiver ! { imap_server_response, Data }, | Receiver ! { imap_server_response, Data }, | ||||
{ next_state, passthrough, State }; | { next_state, passthrough, State }; | ||||
passthrough(Command, State) when is_record(Command, command) -> | passthrough(Command, State) when is_record(Command, command) -> | ||||
NewState = ensure_process_command_queue(State), | NewState = ensure_process_command_queue(State), | ||||
{ next_state, idle, enque_command(Command, NewState) }. | { next_state, idle, enque_command(Command, NewState) }. | ||||
idle(process_command_queue, #state{ command_queue = Queue } = State) -> | idle(process_command_queue, #state{ command_queue = Queue } = State) -> | ||||
UnguardedState = State#state{ process_command_queue_guard = false }, | UnguardedState = State#state{ process_command_queue_guard = false }, | ||||
▲ Show 20 Lines • Show All 347 Lines • Show Last 20 Lines |