Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117750110
mupdate.c
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
66 KB
Referenced Files
None
Subscribers
None
mupdate.c
View Options
/* mupdate.c -- cyrus murder database master
*
* Copyright (c) 1994-2008 Carnegie Mellon University. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The name "Carnegie Mellon University" must not be used to
* endorse or promote products derived from this software without
* prior written permission. For permission or any legal
* details, please contact
* Carnegie Mellon University
* Center for Technology Transfer and Enterprise Creation
* 4615 Forbes Avenue
* Suite 302
* Pittsburgh, PA 15213
* (412) 268-7393, fax: (412) 268-7395
* innovation@andrew.cmu.edu
*
* 4. Redistributions of any form whatsoever must retain the following
* acknowledgment:
* "This product includes software developed by Computing Services
* at Carnegie Mellon University (http://www.cmu.edu/computing/)."
*
* CARNEGIE MELLON UNIVERSITY DISCLAIMS ALL WARRANTIES WITH REGARD TO
* THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS, IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY BE LIABLE
* FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
* AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
* OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* $Id: mupdate.c,v 1.114 2010/07/27 19:54:06 wescraig Exp $
*/
#include
<config.h>
#ifdef HAVE_UNISTD_H
#include
<unistd.h>
#endif
#include
<stdio.h>
#include
<string.h>
#include
<ctype.h>
#include
<signal.h>
#include
<stdlib.h>
#include
<syslog.h>
#include
<errno.h>
#include
<netdb.h>
#include
<sys/socket.h>
#include
<netinet/in.h>
#include
<arpa/inet.h>
#include
<sys/types.h>
#include
<sys/ioctl.h>
#if !defined(SIOCGIFCONF) && defined(HAVE_SYS_SOCKIO_H)
# include <sys/sockio.h>
#endif
#include
<net/if.h>
#include
<pthread.h>
#include
<sasl/sasl.h>
#include
<sasl/saslutil.h>
#include
"mupdate.h"
#include
"mupdate-client.h"
#include
"telemetry.h"
#include
"strarray.h"
#include
"assert.h"
#include
"exitcodes.h"
#include
"global.h"
#include
"imap/imap_err.h"
#include
"mailbox.h"
#include
"mboxlist.h"
#include
"mpool.h"
#include
"nonblock.h"
#include
"prot.h"
#include
"tls.h"
#include
"util.h"
#include
"version.h"
#include
"xmalloc.h"
#include
"xstrlcpy.h"
/* Sent to clients that we can't accept a connection for. */
static
const
char
SERVER_UNABLE_STRING
[]
=
"* BYE
\"
Server Unable
\"\r\n
"
;
static
const
int
NO_NEW_CONNECTION
=
-1
;
static
int
masterp
=
0
;
typedef
enum
{
DOCMD_OK
=
0
,
DOCMD_CONN_FINISHED
=
1
}
mupdate_docmd_result_t
;
enum
{
poll_interval
=
1
,
update_wait
=
5
};
struct
pending
{
struct
pending
*
next
;
char
mailbox
[
MAX_MAILBOX_BUFFER
];
};
struct
conn
{
int
fd
;
int
logfd
;
struct
protstream
*
pin
;
struct
protstream
*
pout
;
sasl_conn_t
*
saslconn
;
char
*
userid
;
#ifdef HAVE_SSL
SSL
*
tlsconn
;
#else
void
*
tlsconn
;
#endif
void
*
tls_comp
;
/* TLS compression method, if any */
int
compress_done
;
/* have we done a successful compress? */
int
idle
;
char
clienthost
[
NI_MAXHOST
*
2
+
1
];
struct
{
char
*
ipremoteport
;
char
ipremoteport_buf
[
60
];
char
*
iplocalport
;
char
iplocalport_buf
[
60
];
sasl_ssf_t
ssf
;
char
*
authid
;
}
saslprops
;
/* UPDATE command handling */
const
char
*
streaming
;
/* tag */
strarray_t
*
streaming_hosts
;
/* partial updates */
/* pending changes to send, in reverse order */
pthread_mutex_t
m
;
struct
pending
*
plist
;
struct
pending
*
ptail
;
struct
conn
*
updatelist_next
;
struct
prot_waitevent
*
ev
;
/* invoked every 'update_wait' seconds
to send out updates */
/* Prefix for list commands */
const
char
*
list_prefix
;
size_t
list_prefix_len
;
/* For parsing */
struct
buf
tag
,
cmd
,
arg1
,
arg2
,
arg3
;
/* For connection list management */
struct
conn
*
next
;
struct
conn
*
next_idle
;
};
static
int
ready_for_connections
=
0
;
static
pthread_cond_t
ready_for_connections_cond
=
PTHREAD_COND_INITIALIZER
;
static
pthread_mutex_t
ready_for_connections_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
int
synced
=
0
;
static
pthread_cond_t
synced_cond
=
PTHREAD_COND_INITIALIZER
;
static
pthread_mutex_t
synced_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
pthread_mutex_t
listener_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
pthread_cond_t
listener_cond
=
PTHREAD_COND_INITIALIZER
;
static
int
listener_lock
=
0
;
/* if you want to lock both listener and either of these two, you
* must lock listener first. You must have both listener_mutex and
* idle_connlist_mutex locked to remove anything from the idle_connlist */
static
pthread_mutex_t
idle_connlist_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
struct
conn
*
idle_connlist
=
NULL
;
/* protected by listener_mutex */
static
pthread_mutex_t
connection_count_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
int
connection_count
=
0
;
static
pthread_mutex_t
idle_worker_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
int
idle_worker_count
=
0
;
static
pthread_mutex_t
worker_count_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
int
worker_count
=
0
;
static
pthread_mutex_t
connlist_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
struct
conn
*
connlist
=
NULL
;
/* ---- connection signaling pipe */
static
int
conn_pipe
[
2
];
/* ---- database access ---- */
static
pthread_mutex_t
mailboxes_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
struct
conn
*
updatelist
=
NULL
;
/* --- prototypes --- */
static
void
conn_free
(
struct
conn
*
C
);
static
mupdate_docmd_result_t
docmd
(
struct
conn
*
c
);
static
void
cmd_authenticate
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
mech
,
const
char
*
clientstart
);
static
void
cmd_set
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
mailbox
,
const
char
*
server
,
const
char
*
acl
,
enum
settype
t
);
static
void
cmd_find
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
mailbox
,
int
send_ok
,
int
send_delete
);
static
void
cmd_list
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
host_prefix
);
static
void
cmd_startupdate
(
struct
conn
*
C
,
const
char
*
tag
,
strarray_t
*
partial
);
static
void
cmd_starttls
(
struct
conn
*
C
,
const
char
*
tag
);
static
void
cmd_compress
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
alg
);
void
shut_down
(
int
code
);
static
int
reset_saslconn
(
struct
conn
*
c
);
static
void
database_init
(
void
);
static
void
sendupdates
(
struct
conn
*
C
,
int
flushnow
);
extern
int
saslserver
(
sasl_conn_t
*
conn
,
const
char
*
mech
,
const
char
*
init_resp
,
const
char
*
resp_prefix
,
const
char
*
continuation
,
const
char
*
empty_chal
,
struct
protstream
*
pin
,
struct
protstream
*
pout
,
int
*
sasl_result
,
char
**
success_data
);
/* --- prototypes in mupdate-slave.c */
void
*
mupdate_client_start
(
void
*
rock
);
void
*
mupdate_placebo_kick_start
(
void
*
rock
);
/* --- main() for each thread */
static
void
*
thread_main
(
void
*
rock
);
/* --- for config.c */
const
int
config_need_data
=
0
;
static
struct
conn
*
conn_new
(
int
fd
)
{
struct
conn
*
C
=
xzmalloc
(
sizeof
(
struct
conn
));
const
char
*
clienthost
,
*
localip
,
*
remoteip
;
int
r
;
C
->
fd
=
fd
;
C
->
logfd
=
-1
;
C
->
pin
=
prot_new
(
C
->
fd
,
0
);
C
->
pout
=
prot_new
(
C
->
fd
,
1
);
prot_setflushonread
(
C
->
pin
,
C
->
pout
);
prot_settimeout
(
C
->
pin
,
180
*
60
);
C
->
pin
->
userdata
=
C
->
pout
->
userdata
=
C
;
pthread_mutex_lock
(
&
connlist_mutex
);
/* LOCK */
C
->
next
=
connlist
;
connlist
=
C
;
pthread_mutex_unlock
(
&
connlist_mutex
);
/* UNLOCK */
pthread_mutex_lock
(
&
connection_count_mutex
);
/* LOCK */
connection_count
++
;
pthread_mutex_unlock
(
&
connection_count_mutex
);
/* UNLOCK */
/* Find out name of client host */
clienthost
=
get_clienthost
(
C
->
fd
,
&
localip
,
&
remoteip
);
strlcpy
(
C
->
clienthost
,
clienthost
,
sizeof
(
C
->
clienthost
));
if
(
localip
&&
remoteip
)
{
strlcpy
(
C
->
saslprops
.
ipremoteport_buf
,
remoteip
,
sizeof
(
C
->
saslprops
.
ipremoteport_buf
));
C
->
saslprops
.
ipremoteport
=
C
->
saslprops
.
ipremoteport_buf
;
strlcpy
(
C
->
saslprops
.
iplocalport_buf
,
remoteip
,
sizeof
(
C
->
saslprops
.
iplocalport_buf
));
C
->
saslprops
.
iplocalport
=
C
->
saslprops
.
iplocalport_buf
;
}
/* create sasl connection */
r
=
sasl_server_new
(
"mupdate"
,
config_servername
,
NULL
,
C
->
saslprops
.
iplocalport
,
C
->
saslprops
.
ipremoteport
,
NULL
,
0
,
&
C
->
saslconn
);
if
(
r
!=
SASL_OK
)
{
syslog
(
LOG_ERR
,
"failed to start sasl for connection: %s"
,
sasl_errstring
(
r
,
NULL
,
NULL
));
prot_printf
(
C
->
pout
,
SERVER_UNABLE_STRING
);
C
->
idle
=
0
;
conn_free
(
C
);
return
NULL
;
}
/* set my allowable security properties */
sasl_setprop
(
C
->
saslconn
,
SASL_SEC_PROPS
,
mysasl_secprops
(
SASL_SEC_NOANONYMOUS
));
/* Clear Buffers */
memset
(
&
(
C
->
tag
),
0
,
sizeof
(
struct
buf
));
memset
(
&
(
C
->
cmd
),
0
,
sizeof
(
struct
buf
));
memset
(
&
(
C
->
arg1
),
0
,
sizeof
(
struct
buf
));
memset
(
&
(
C
->
arg2
),
0
,
sizeof
(
struct
buf
));
memset
(
&
(
C
->
arg3
),
0
,
sizeof
(
struct
buf
));
return
C
;
}
static
void
conn_free
(
struct
conn
*
C
)
{
assert
(
!
C
->
idle
);
/* Not allowed to free idle connections */
if
(
C
->
streaming
)
{
/* remove from updatelist */
struct
conn
*
upc
;
pthread_mutex_lock
(
&
mailboxes_mutex
);
if
(
C
==
updatelist
)
{
/* first thing in updatelist */
updatelist
=
C
->
updatelist_next
;
}
else
{
/* find in update list */
for
(
upc
=
updatelist
;
upc
->
updatelist_next
!=
NULL
;
upc
=
upc
->
updatelist_next
)
{
if
(
upc
->
updatelist_next
==
C
)
break
;
}
/* must find it ! */
assert
(
upc
->
updatelist_next
==
C
);
upc
->
updatelist_next
=
C
->
updatelist_next
;
}
pthread_mutex_unlock
(
&
mailboxes_mutex
);
}
/* decrease connection counter */
pthread_mutex_lock
(
&
connection_count_mutex
);
connection_count
--
;
pthread_mutex_unlock
(
&
connection_count_mutex
);
/* remove from connlist */
pthread_mutex_lock
(
&
connlist_mutex
);
/* LOCK */
if
(
C
==
connlist
)
{
connlist
=
connlist
->
next
;
}
else
{
struct
conn
*
t
;
for
(
t
=
connlist
;
t
->
next
!=
NULL
;
t
=
t
->
next
)
{
if
(
t
->
next
==
C
)
break
;
}
assert
(
t
!=
NULL
);
t
->
next
=
C
->
next
;
}
pthread_mutex_unlock
(
&
connlist_mutex
);
/* UNLOCK */
if
(
C
->
ev
)
prot_removewaitevent
(
C
->
pin
,
C
->
ev
);
prot_flush
(
C
->
pout
);
if
(
C
->
pin
)
prot_free
(
C
->
pin
);
if
(
C
->
pout
)
prot_free
(
C
->
pout
);
#ifdef HAVE_SSL
if
(
C
->
tlsconn
)
tls_reset_servertls
(
&
C
->
tlsconn
);
tls_shutdown_serverengine
();
#endif
cyrus_close_sock
(
C
->
fd
);
if
(
C
->
logfd
!=
-1
)
close
(
C
->
logfd
);
if
(
C
->
saslconn
)
sasl_dispose
(
&
C
->
saslconn
);
if
(
C
->
saslprops
.
authid
)
free
(
C
->
saslprops
.
authid
);
/* free struct bufs */
buf_free
(
&
(
C
->
tag
));
buf_free
(
&
(
C
->
cmd
));
buf_free
(
&
(
C
->
arg1
));
buf_free
(
&
(
C
->
arg2
));
buf_free
(
&
(
C
->
arg3
));
if
(
C
->
streaming_hosts
)
strarray_free
(
C
->
streaming_hosts
);
free
(
C
);
}
/*
* The auth_*.c backends called by mysasl_proxy_policy()
* use static variables which we need to protect with a mutex.
*/
static
pthread_mutex_t
proxy_policy_mutex
=
PTHREAD_MUTEX_INITIALIZER
;
static
int
mupdate_proxy_policy
(
sasl_conn_t
*
conn
,
void
*
context
,
const
char
*
requested_user
,
unsigned
rlen
,
const
char
*
auth_identity
,
unsigned
alen
,
const
char
*
def_realm
,
unsigned
urlen
,
struct
propctx
*
propctx
)
{
int
r
;
pthread_mutex_lock
(
&
proxy_policy_mutex
);
/* LOCK */
r
=
mysasl_proxy_policy
(
conn
,
context
,
requested_user
,
rlen
,
auth_identity
,
alen
,
def_realm
,
urlen
,
propctx
);
pthread_mutex_unlock
(
&
proxy_policy_mutex
);
/* UNLOCK */
return
r
;
}
static
struct
sasl_callback
mysasl_cb
[]
=
{
{
SASL_CB_GETOPT
,
(
mysasl_cb_ft
*
)
&
mysasl_config
,
NULL
},
{
SASL_CB_PROXY_POLICY
,
(
mysasl_cb_ft
*
)
&
mupdate_proxy_policy
,
NULL
},
{
SASL_CB_LIST_END
,
NULL
,
NULL
}
};
/*
* Is the IP address of the given hostname local?
* Returns 1 if local, 0 otherwise.
*/
static
int
islocalip
(
const
char
*
hostname
)
{
struct
hostent
*
hp
;
struct
in_addr
*
haddr
,
*
iaddr
;
struct
ifconf
ifc
;
struct
ifreq
*
ifr
;
char
buf
[
8192
];
/* XXX this limits us to 256 interfaces */
int
sock
,
islocal
=
0
;
if
((
hp
=
gethostbyname
(
hostname
))
==
NULL
)
{
fprintf
(
stderr
,
"unknown host: %s
\n
"
,
hostname
);
return
0
;
}
haddr
=
(
struct
in_addr
*
)
hp
->
h_addr
;
if
((
sock
=
socket
(
AF_INET
,
SOCK_STREAM
,
0
))
==
-1
)
{
fprintf
(
stderr
,
"socket() failed
\n
"
);
return
0
;
}
ifc
.
ifc_buf
=
buf
;
ifc
.
ifc_len
=
sizeof
(
buf
);
if
(
ioctl
(
sock
,
SIOCGIFCONF
,
&
ifc
)
!=
0
)
{
fprintf
(
stderr
,
"ioctl(SIOCGIFCONF) failed: %d
\n
"
,
errno
);
close
(
sock
);
return
0
;
}
for
(
ifr
=
ifc
.
ifc_req
;
ifr
-
ifc
.
ifc_req
<
ifc
.
ifc_len
;
ifr
++
)
{
if
(
ioctl
(
sock
,
SIOCGIFADDR
,
ifr
)
!=
0
)
continue
;
if
(
ioctl
(
sock
,
SIOCGIFFLAGS
,
ifr
)
!=
0
)
continue
;
/* skip any inactive or loopback interfaces */
if
(
!
(
ifr
->
ifr_flags
&
IFF_UP
)
||
(
ifr
->
ifr_flags
&
IFF_LOOPBACK
))
continue
;
iaddr
=
&
(((
struct
sockaddr_in
*
)
&
ifr
->
ifr_addr
)
->
sin_addr
);
/* compare the host address to the interface address */
if
(
!
memcmp
(
haddr
,
iaddr
,
sizeof
(
struct
in_addr
)))
{
islocal
=
1
;
break
;
}
}
close
(
sock
);
return
islocal
;
}
/*
* run once when process is forked;
* MUST NOT exit directly; must return with non-zero error code
*/
int
service_init
(
int
argc
,
char
**
argv
,
char
**
envp
__attribute__
((
unused
)))
{
int
i
,
r
,
workers_to_start
;
int
opt
,
autoselect
=
0
;
pthread_t
t
;
if
(
geteuid
()
==
0
)
fatal
(
"must run as the Cyrus user"
,
EC_USAGE
);
/* Do minor configuration checking */
workers_to_start
=
config_getint
(
IMAPOPT_MUPDATE_WORKERS_START
);
if
(
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MAX
)
<
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MINSPARE
))
{
syslog
(
LOG_CRIT
,
"Maximum total worker threads is less than minimum spare worker threads"
);
return
EC_SOFTWARE
;
}
if
(
workers_to_start
<
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MINSPARE
))
{
syslog
(
LOG_CRIT
,
"Starting worker threads is less than minimum spare worker threads"
);
return
EC_SOFTWARE
;
}
if
(
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MAXSPARE
)
<
workers_to_start
)
{
syslog
(
LOG_CRIT
,
"Maximum spare worker threads is less than starting worker threads"
);
return
EC_SOFTWARE
;
}
if
(
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MINSPARE
)
>
workers_to_start
)
{
syslog
(
LOG_CRIT
,
"Minimum spare worker threads is greater than starting worker threads"
);
return
EC_SOFTWARE
;
}
if
(
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MAX
)
<
workers_to_start
)
{
syslog
(
LOG_CRIT
,
"Maximum total worker threads is less than starting worker threads"
);
return
EC_SOFTWARE
;
}
/* set signal handlers */
signals_set_shutdown
(
&
shut_down
);
signal
(
SIGPIPE
,
SIG_IGN
);
global_sasl_init
(
1
,
1
,
mysasl_cb
);
/* see if we're the master or a slave */
while
((
opt
=
getopt
(
argc
,
argv
,
"ma"
))
!=
EOF
)
{
switch
(
opt
)
{
case
'm'
:
masterp
=
1
;
break
;
case
'a'
:
autoselect
=
1
;
break
;
default
:
break
;
}
}
if
(
!
masterp
&&
autoselect
)
masterp
=
islocalip
(
config_mupdate_server
);
if
(
masterp
&&
config_mupdate_config
==
IMAP_ENUM_MUPDATE_CONFIG_UNIFIED
)
{
/* XXX We currently prohibit this because mailboxes created
* on the master will cause local mailbox entries to be propagated
* to the slave. We can probably fix this by prepending
* config_servername onto the entries before updating the slaves.
*/
fatal
(
"cannot run mupdate master on a unified server"
,
EC_USAGE
);
}
if
(
pipe
(
conn_pipe
)
==
-1
)
{
syslog
(
LOG_ERR
,
"could not setup connection signaling pipe %m"
);
return
EC_OSERR
;
}
database_init
();
if
(
!
masterp
)
{
r
=
pthread_create
(
&
t
,
NULL
,
&
mupdate_client_start
,
NULL
);
if
(
r
==
0
)
{
pthread_detach
(
t
);
}
else
{
syslog
(
LOG_ERR
,
"could not start client thread"
);
return
EC_SOFTWARE
;
}
/* Wait until they sync the database */
pthread_mutex_lock
(
&
synced_mutex
);
if
(
!
synced
)
pthread_cond_wait
(
&
synced_cond
,
&
synced_mutex
);
pthread_mutex_unlock
(
&
synced_mutex
);
}
else
{
pthread_t
t
;
r
=
pthread_create
(
&
t
,
NULL
,
&
mupdate_placebo_kick_start
,
NULL
);
if
(
r
==
0
)
{
pthread_detach
(
t
);
}
else
{
syslog
(
LOG_ERR
,
"could not start placebo kick thread"
);
return
EC_SOFTWARE
;
}
mupdate_ready
();
}
/* Now create the worker thread pool */
for
(
i
=
0
;
i
<
workers_to_start
;
i
++
)
{
r
=
pthread_create
(
&
t
,
NULL
,
&
thread_main
,
NULL
);
if
(
r
==
0
)
{
pthread_detach
(
t
);
}
else
{
syslog
(
LOG_ERR
,
"could not start client thread"
);
return
EC_SOFTWARE
;
}
}
return
0
;
}
/* Called by service API to shut down the service */
void
service_abort
(
int
error
)
{
shut_down
(
error
);
}
EXPORTED
void
fatal
(
const
char
*
s
,
int
code
)
{
static
int
recurse_code
=
0
;
if
(
recurse_code
)
exit
(
code
);
else
recurse_code
=
code
;
syslog
(
LOG_ERR
,
"%s"
,
s
);
shut_down
(
code
);
/* NOTREACHED */
exit
(
code
);
/* shut up GCC */
}
#define CHECKNEWLINE(c, ch) do { if ((ch) == '\r') (ch)=prot_getc((c)->pin); \
if ((ch) != '\n') goto extraargs; } while (0)
static
mupdate_docmd_result_t
docmd
(
struct
conn
*
c
)
{
mupdate_docmd_result_t
ret
=
DOCMD_OK
;
int
ch
;
int
was_blocking
=
prot_IS_BLOCKING
(
c
->
pin
);
char
*
p
;
/* We know we have input, so skip the check below.
* Note that we MUST skip this nonblocking check in order to properly
* catch connections that have timed out.
*/
goto
cmd
;
nextcmd
:
/* First we do a check for input */
prot_NONBLOCK
(
c
->
pin
);
ch
=
prot_getc
(
c
->
pin
);
if
(
ch
==
EOF
&&
errno
==
EAGAIN
)
{
/* no input from client */
goto
done
;
}
else
if
(
ch
==
EOF
)
{
goto
lost_conn
;
}
else
{
/* there's input waiting, put back our character */
prot_ungetc
(
ch
,
c
->
pin
);
}
/* Set it back to blocking so we don't get half a word */
prot_BLOCK
(
c
->
pin
);
cmd
:
ch
=
getword
(
c
->
pin
,
&
(
c
->
tag
));
if
(
ch
==
EOF
)
goto
lost_conn
;
if
(
ch
!=
' '
)
{
prot_printf
(
c
->
pout
,
"* BAD
\"
Need command
\"\r\n
"
);
eatline
(
c
->
pin
,
ch
);
goto
nextcmd
;
}
/* parse command name */
ch
=
getword
(
c
->
pin
,
&
(
c
->
cmd
));
if
(
ch
==
EOF
)
{
goto
lost_conn
;
}
else
if
(
!
c
->
cmd
.
s
[
0
])
{
prot_printf
(
c
->
pout
,
"%s BAD
\"
Null command
\"\r\n
"
,
c
->
tag
.
s
);
eatline
(
c
->
pin
,
ch
);
goto
nextcmd
;
}
if
(
Uislower
(
c
->
cmd
.
s
[
0
]))
{
c
->
cmd
.
s
[
0
]
=
toupper
((
unsigned
char
)
c
->
cmd
.
s
[
0
]);
}
for
(
p
=
&
(
c
->
cmd
.
s
[
1
]);
*
p
;
p
++
)
{
if
(
Uisupper
(
*
p
))
*
p
=
tolower
((
unsigned
char
)
*
p
);
}
switch
(
c
->
cmd
.
s
[
0
])
{
case
'A'
:
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Authenticate"
))
{
int
opt
=
0
;
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg1
));
if
(
ch
==
' '
)
{
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg2
));
opt
=
1
;
}
CHECKNEWLINE
(
c
,
ch
);
if
(
c
->
userid
)
{
prot_printf
(
c
->
pout
,
"%s BAD
\"
already authenticated
\"\r\n
"
,
c
->
tag
.
s
);
goto
nextcmd
;
}
cmd_authenticate
(
c
,
c
->
tag
.
s
,
c
->
arg1
.
s
,
opt
?
c
->
arg2
.
s
:
NULL
);
}
else
if
(
!
c
->
userid
)
goto
nologin
;
else
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Activate"
))
{
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg1
));
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg2
));
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg3
));
CHECKNEWLINE
(
c
,
ch
);
if
(
c
->
streaming
)
goto
notwhenstreaming
;
if
(
!
masterp
)
goto
masteronly
;
cmd_set
(
c
,
c
->
tag
.
s
,
c
->
arg1
.
s
,
c
->
arg2
.
s
,
c
->
arg3
.
s
,
SET_ACTIVE
);
}
else
goto
badcmd
;
break
;
#ifdef HAVE_ZLIB
case
'C'
:
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Compress"
))
{
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg1
));
CHECKNEWLINE
(
c
,
ch
);
cmd_compress
(
c
,
c
->
tag
.
s
,
c
->
arg1
.
s
);
}
else
goto
badcmd
;
break
;
#endif
case
'D'
:
if
(
!
c
->
userid
)
goto
nologin
;
else
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Deactivate"
))
{
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg1
));
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg2
));
CHECKNEWLINE
(
c
,
ch
);
if
(
c
->
streaming
)
goto
notwhenstreaming
;
if
(
!
masterp
)
goto
masteronly
;
cmd_set
(
c
,
c
->
tag
.
s
,
c
->
arg1
.
s
,
c
->
arg2
.
s
,
NULL
,
SET_DEACTIVATE
);
}
else
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Delete"
))
{
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg1
));
CHECKNEWLINE
(
c
,
ch
);
if
(
c
->
streaming
)
goto
notwhenstreaming
;
if
(
!
masterp
)
goto
masteronly
;
cmd_set
(
c
,
c
->
tag
.
s
,
c
->
arg1
.
s
,
NULL
,
NULL
,
SET_DELETE
);
}
else
goto
badcmd
;
break
;
case
'F'
:
if
(
!
c
->
userid
)
goto
nologin
;
else
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Find"
))
{
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg1
));
CHECKNEWLINE
(
c
,
ch
);
if
(
c
->
streaming
)
goto
notwhenstreaming
;
cmd_find
(
c
,
c
->
tag
.
s
,
c
->
arg1
.
s
,
1
,
0
);
}
else
goto
badcmd
;
break
;
case
'L'
:
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Logout"
))
{
CHECKNEWLINE
(
c
,
ch
);
prot_printf
(
c
->
pout
,
"%s OK
\"
bye-bye
\"\r\n
"
,
c
->
tag
.
s
);
ret
=
DOCMD_CONN_FINISHED
;
goto
done
;
}
else
if
(
!
c
->
userid
)
goto
nologin
;
else
if
(
!
strcmp
(
c
->
cmd
.
s
,
"List"
))
{
int
opt
=
0
;
if
(
ch
==
' '
)
{
/* Optional partition/host prefix parameter */
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg1
));
opt
=
1
;
}
CHECKNEWLINE
(
c
,
ch
);
if
(
c
->
streaming
)
goto
notwhenstreaming
;
cmd_list
(
c
,
c
->
tag
.
s
,
opt
?
c
->
arg1
.
s
:
NULL
);
prot_printf
(
c
->
pout
,
"%s OK
\"
list complete
\"\r\n
"
,
c
->
tag
.
s
);
}
else
goto
badcmd
;
break
;
case
'N'
:
if
(
!
c
->
userid
)
goto
nologin
;
else
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Noop"
))
{
CHECKNEWLINE
(
c
,
ch
);
if
(
c
->
streaming
)
{
/* Make *very* sure we are up-to-date */
kick_mupdate
();
sendupdates
(
c
,
0
);
/* don't flush pout though */
}
prot_printf
(
c
->
pout
,
"%s OK
\"
Noop done
\"\r\n
"
,
c
->
tag
.
s
);
}
else
goto
badcmd
;
break
;
case
'R'
:
if
(
!
c
->
userid
)
goto
nologin
;
else
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Reserve"
))
{
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg1
));
if
(
ch
!=
' '
)
goto
missingargs
;
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg2
));
CHECKNEWLINE
(
c
,
ch
);
if
(
c
->
streaming
)
goto
notwhenstreaming
;
if
(
!
masterp
)
goto
masteronly
;
cmd_set
(
c
,
c
->
tag
.
s
,
c
->
arg1
.
s
,
c
->
arg2
.
s
,
NULL
,
SET_RESERVE
);
}
else
goto
badcmd
;
break
;
case
'S'
:
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Starttls"
))
{
CHECKNEWLINE
(
c
,
ch
);
/* XXX discard any input pipelined after STARTTLS */
prot_flush
(
c
->
pin
);
if
(
!
tls_enabled
())
{
/* we don't support starttls */
goto
badcmd
;
}
/* if we've already done SASL fail */
if
(
c
->
userid
)
{
prot_printf
(
c
->
pout
,
"%s BAD Can't Starttls after authentication
\r\n
"
,
c
->
tag
.
s
);
goto
nextcmd
;
}
/* if we've already done COMPRESS fail */
if
(
c
->
compress_done
)
{
prot_printf
(
c
->
pout
,
"%s BAD Can't Starttls after Compress
\r\n
"
,
c
->
tag
.
s
);
goto
nextcmd
;
}
/* check if already did a successful tls */
if
(
c
->
tlsconn
)
{
prot_printf
(
c
->
pout
,
"%s BAD Already did a successful Starttls
\r\n
"
,
c
->
tag
.
s
);
goto
nextcmd
;
}
cmd_starttls
(
c
,
c
->
tag
.
s
);
}
else
goto
badcmd
;
break
;
case
'U'
:
if
(
!
c
->
userid
)
goto
nologin
;
else
if
(
!
strcmp
(
c
->
cmd
.
s
,
"Update"
))
{
strarray_t
*
arg
=
NULL
;
int
counter
=
30
;
/* limit on number of processed hosts */
while
(
ch
==
' '
)
{
/* Hey, look, more bits of a PARTIAL-UPDATE command */
ch
=
getstring
(
c
->
pin
,
c
->
pout
,
&
(
c
->
arg1
));
if
(
c
->
arg1
.
s
[
0
]
==
'\0'
)
{
strarray_free
(
arg
);
goto
badargs
;
}
if
(
counter
--
==
0
)
{
strarray_free
(
arg
);
goto
extraargs
;
}
if
(
!
arg
)
arg
=
strarray_new
();
strarray_append
(
arg
,
c
->
arg1
.
s
);
}
CHECKNEWLINE
(
c
,
ch
);
if
(
c
->
streaming
)
goto
notwhenstreaming
;
cmd_startupdate
(
c
,
c
->
tag
.
s
,
arg
);
}
else
goto
badcmd
;
break
;
default
:
badcmd
:
prot_printf
(
c
->
pout
,
"%s BAD
\"
Unrecognized command
\"\r\n
"
,
c
->
tag
.
s
);
eatline
(
c
->
pin
,
ch
);
break
;
extraargs
:
prot_printf
(
c
->
pout
,
"%s BAD
\"
Extra arguments
\"\r\n
"
,
c
->
tag
.
s
);
eatline
(
c
->
pin
,
ch
);
break
;
badargs
:
prot_printf
(
c
->
pout
,
"%s BAD
\"
Badly formed arguments
\"\r\n
"
,
c
->
tag
.
s
);
eatline
(
c
->
pin
,
ch
);
break
;
missingargs
:
prot_printf
(
c
->
pout
,
"%s BAD
\"
Missing arguments
\"\r\n
"
,
c
->
tag
.
s
);
eatline
(
c
->
pin
,
ch
);
break
;
notwhenstreaming
:
prot_printf
(
c
->
pout
,
"%s BAD
\"
not legal when streaming
\"\r\n
"
,
c
->
tag
.
s
);
break
;
masteronly
:
prot_printf
(
c
->
pout
,
"%s BAD
\"
read-only session
\"\r\n
"
,
c
->
tag
.
s
);
break
;
nologin
:
prot_printf
(
c
->
pout
,
"%s BAD Please login first
\r\n
"
,
c
->
tag
.
s
);
eatline
(
c
->
pin
,
ch
);
break
;
}
/* Check for more input */
goto
nextcmd
;
lost_conn
:
{
const
char
*
err
;
if
((
err
=
prot_error
(
c
->
pin
))
!=
NULL
&&
strcmp
(
err
,
PROT_EOF_STRING
))
{
syslog
(
LOG_WARNING
,
"%s, closing connection"
,
err
);
prot_printf
(
c
->
pout
,
"* BYE
\"
%s
\"\r\n
"
,
err
);
}
ret
=
DOCMD_CONN_FINISHED
;
}
done
:
/* Restore the state of the input stream */
if
(
was_blocking
)
prot_BLOCK
(
c
->
pin
);
else
prot_NONBLOCK
(
c
->
pin
);
/* Necessary since we don't ever do a prot_read on an idle connection
* in mupdate */
prot_flush
(
c
->
pout
);
return
ret
;
}
/*
* run for each accepted connection
*/
int
service_main_fd
(
int
fd
,
int
argc
__attribute__
((
unused
)),
char
**
argv
__attribute__
((
unused
)),
char
**
envp
__attribute__
((
unused
)))
{
int
flag
;
int
r
;
/* First check that we can handle the new connection. */
pthread_mutex_lock
(
&
connection_count_mutex
);
/* LOCK */
flag
=
(
connection_count
>=
config_getint
(
IMAPOPT_MUPDATE_CONNECTIONS_MAX
));
pthread_mutex_unlock
(
&
connection_count_mutex
);
/* UNLOCK */
if
(
flag
)
{
/* Do the nonblocking write, if it fails, too bad for them. */
nonblock
(
fd
,
1
);
r
=
write
(
fd
,
SERVER_UNABLE_STRING
,
sizeof
(
SERVER_UNABLE_STRING
));
close
(
fd
);
syslog
(
LOG_ERR
,
"Server too busy, dropping connection."
);
if
(
r
)
return
0
;
/* filthy hack to avoid warning on 'r' */
}
else
if
(
write
(
conn_pipe
[
1
],
&
fd
,
sizeof
(
fd
))
==
-1
)
{
/* signal that a new file descriptor is available.
* If it fails... */
syslog
(
LOG_CRIT
,
"write to conn_pipe to signal new connection failed: %m"
);
return
EC_TEMPFAIL
;
}
return
0
;
}
/*
* Issue the capability banner
*/
static
void
dobanner
(
struct
conn
*
c
)
{
char
slavebuf
[
4096
];
const
char
*
mechs
;
int
mechcount
;
int
ret
;
/* send initial the banner + flush pout */
ret
=
sasl_listmech
(
c
->
saslconn
,
NULL
,
"* AUTH
\"
"
,
"
\"
\"
"
,
"
\"
"
,
&
mechs
,
NULL
,
&
mechcount
);
/* Add mupdate:// tag if necessary */
if
(
!
masterp
)
{
if
(
!
config_mupdate_server
)
fatal
(
"mupdate server was not specified for slave"
,
EC_TEMPFAIL
);
snprintf
(
slavebuf
,
sizeof
(
slavebuf
),
"mupdate://%s"
,
config_mupdate_server
);
}
prot_printf
(
c
->
pout
,
"%s
\r\n
"
,
(
ret
==
SASL_OK
&&
mechcount
>
0
)
?
mechs
:
"* AUTH"
);
if
(
tls_enabled
()
&&
!
c
->
tlsconn
)
{
prot_printf
(
c
->
pout
,
"* STARTTLS
\r\n
"
);
}
#ifdef HAVE_ZLIB
if
(
!
c
->
compress_done
&&
!
c
->
tls_comp
)
{
prot_printf
(
c
->
pout
,
"* COMPRESS
\"
DEFLATE
\"\r\n
"
);
}
#endif
prot_printf
(
c
->
pout
,
"* PARTIAL-UPDATE
\r\n
"
);
prot_printf
(
c
->
pout
,
"* OK MUPDATE
\"
%s
\"
\"
Cyrus Murder
\"
\"
%s
\"
\"
%s
\"\r\n
"
,
config_servername
,
cyrus_version
(),
masterp
?
"(master)"
:
slavebuf
);
prot_flush
(
c
->
pout
);
}
/*
* The main thread loop
*/
/* Note that You Must Lock Listen mutex before idle worker mutex,
* though you can lock them individually too */
static
void
*
thread_main
(
void
*
rock
__attribute__
((
unused
)))
{
struct
conn
*
C
;
/* used for loops */
struct
conn
*
currConn
=
NULL
;
/* the connection we care about currently */
struct
protgroup
*
protin
=
protgroup_new
(
PROTGROUP_SIZE_DEFAULT
);
struct
protgroup
*
protout
=
NULL
;
struct
timeval
now
;
struct
timespec
timeout
;
int
need_workers
,
too_many
;
int
max_worker_flag
;
int
do_a_command
;
int
send_a_banner
;
int
connflag
;
int
new_fd
;
int
ret
=
0
;
struct
conn
*
ni
;
/* Lock Worker Count Mutex */
pthread_mutex_lock
(
&
worker_count_mutex
);
/* LOCK */
/* Change total number of workers */
worker_count
++
;
syslog
(
LOG_DEBUG
,
"New worker thread started, for a total of %d"
,
worker_count
);
/* Unlock Worker Count Mutex */
pthread_mutex_unlock
(
&
worker_count_mutex
);
/* UNLOCK */
/* This is a big infinite loop */
while
(
1
)
{
send_a_banner
=
do_a_command
=
0
;
pthread_mutex_lock
(
&
idle_worker_mutex
);
/* If we are over the limit on idle threads, die. */
max_worker_flag
=
(
idle_worker_count
>=
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MAXSPARE
));
/* Increment Idle Workers */
if
(
!
max_worker_flag
)
idle_worker_count
++
;
pthread_mutex_unlock
(
&
idle_worker_mutex
);
if
(
max_worker_flag
)
goto
worker_thread_done
;
retry_lock
:
/* Lock Listen Mutex - If locking takes more than 60 seconds,
* kill off this thread. Ideally this is a FILO queue */
pthread_mutex_lock
(
&
listener_mutex
);
/* LOCK */
ret
=
0
;
while
(
listener_lock
&&
ret
!=
ETIMEDOUT
)
{
gettimeofday
(
&
now
,
NULL
);
timeout
.
tv_sec
=
now
.
tv_sec
+
60
;
timeout
.
tv_nsec
=
now
.
tv_usec
*
1000
;
ret
=
pthread_cond_timedwait
(
&
listener_cond
,
&
listener_mutex
,
&
timeout
);
}
if
(
!
ret
)
{
/* Set listener lock until we decide what to do */
listener_lock
=
1
;
}
pthread_mutex_unlock
(
&
listener_mutex
);
/* UNLOCK */
if
(
ret
==
ETIMEDOUT
)
{
pthread_mutex_lock
(
&
idle_worker_mutex
);
/* LOCK */
if
(
idle_worker_count
<=
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MINSPARE
))
{
pthread_mutex_unlock
(
&
idle_worker_mutex
);
/* UNLOCK */
/* below number of spare workers, try to get the lock again */
goto
retry_lock
;
}
else
{
/* Decrement Idle Worker Count */
idle_worker_count
--
;
pthread_mutex_unlock
(
&
idle_worker_mutex
);
/* UNLOCK */
syslog
(
LOG_DEBUG
,
"Thread timed out waiting for listener_lock"
);
goto
worker_thread_done
;
}
}
signals_poll
();
/* Check if we are ready for connections, if not, wait */
pthread_mutex_lock
(
&
ready_for_connections_mutex
);
/* LOCK */
/* are we ready to take connections? */
while
(
!
ready_for_connections
)
{
pthread_cond_wait
(
&
ready_for_connections_cond
,
&
ready_for_connections_mutex
);
}
pthread_mutex_unlock
(
&
ready_for_connections_mutex
);
/* UNLOCK */
connflag
=
0
;
/* Reset protin to all zeros (to preserve memory allocation) */
protgroup_reset
(
protin
);
/* Clear protout if needed */
protgroup_free
(
protout
);
protout
=
NULL
;
/* Build list of idle protstreams */
pthread_mutex_lock
(
&
idle_connlist_mutex
);
/* LOCK */
for
(
C
=
idle_connlist
;
C
;
C
=
C
->
next_idle
)
{
assert
(
C
->
idle
);
protgroup_insert
(
protin
,
C
->
pin
);
}
pthread_mutex_unlock
(
&
idle_connlist_mutex
);
/* UNLOCK */
/* Select on Idle Conns + conn_pipe */
if
(
prot_select
(
protin
,
conn_pipe
[
0
],
&
protout
,
&
connflag
,
NULL
)
==
-1
)
{
syslog
(
LOG_ERR
,
"prot_select() failed in thread_main: %m"
);
fatal
(
"prot_select() failed in thread_main"
,
EC_TEMPFAIL
);
}
/* we've got work to do */
pthread_mutex_lock
(
&
idle_worker_mutex
);
/* LOCK */
idle_worker_count
--
;
pthread_mutex_unlock
(
&
idle_worker_mutex
);
/* UNLOCK */
/* If we've been signaled to be unready, drop all current connections
* in the idle list */
pthread_mutex_lock
(
&
ready_for_connections_mutex
);
/* LOCK */
if
(
!
ready_for_connections
)
{
pthread_mutex_unlock
(
&
ready_for_connections_mutex
);
/* UNLOCK */
/* Free all connections on idle_connlist. Note that
* any connection not currently on the idle_connlist will
* instead be freed when they drop out of their docmd() below */
pthread_mutex_lock
(
&
idle_connlist_mutex
);
/* LOCK */
for
(
C
=
idle_connlist
;
C
;
C
=
ni
)
{
ni
=
C
->
next_idle
;
prot_printf
(
C
->
pout
,
"* BYE
\"
no longer ready for connections
\"\r\n
"
);
C
->
idle
=
0
;
conn_free
(
C
);
}
idle_connlist
=
NULL
;
pthread_mutex_unlock
(
&
idle_connlist_mutex
);
/* UNLOCK */
goto
nextlistener
;
}
pthread_mutex_unlock
(
&
ready_for_connections_mutex
);
/* UNLOCK */
if
(
connflag
)
{
/* read the fd from the pipe, if needed */
if
(
read
(
conn_pipe
[
0
],
&
new_fd
,
sizeof
(
new_fd
))
==
-1
)
{
syslog
(
LOG_CRIT
,
"read from conn_pipe for new connection failed: %m"
);
fatal
(
"conn_pipe read failed"
,
EC_TEMPFAIL
);
}
}
else
{
new_fd
=
NO_NEW_CONNECTION
;
}
if
(
new_fd
!=
NO_NEW_CONNECTION
)
{
/* new_fd indicates a new connection */
currConn
=
conn_new
(
new_fd
);
if
(
currConn
)
send_a_banner
=
1
;
}
else
if
(
protout
)
{
/* Handle existing connection, we'll need to pull it off
* the idle_connlist */
struct
protstream
*
ptmp
;
struct
conn
**
prev
;
pthread_mutex_lock
(
&
idle_connlist_mutex
);
/* LOCK */
/* Grab the first connection out of the ready set, and use it */
ptmp
=
protgroup_getelement
(
protout
,
0
);
assert
(
ptmp
);
currConn
=
ptmp
->
userdata
;
assert
(
currConn
);
assert
(
currConn
->
idle
);
currConn
->
idle
=
0
;
for
(
C
=
idle_connlist
,
prev
=
&
(
idle_connlist
);
C
;
prev
=
&
(
C
->
next_idle
),
C
=
C
->
next_idle
)
{
if
(
C
==
currConn
)
{
*
prev
=
C
->
next_idle
;
C
->
next_idle
=
NULL
;
break
;
}
}
pthread_mutex_unlock
(
&
idle_connlist_mutex
);
/* UNLOCK */
do_a_command
=
1
;
}
/*
* If this worker will do any real work, we'll want to make sure
* there are sufficient additional workers while we're busy.
*/
if
(
send_a_banner
||
do_a_command
)
{
pthread_mutex_lock
(
&
idle_worker_mutex
);
/* LOCK */
need_workers
=
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MINSPARE
)
-
idle_worker_count
;
pthread_mutex_unlock
(
&
idle_worker_mutex
);
/* UNLOCK */
pthread_mutex_lock
(
&
worker_count_mutex
);
/* LOCK */
if
(
need_workers
>
0
)
{
too_many
=
(
need_workers
+
worker_count
)
-
config_getint
(
IMAPOPT_MUPDATE_WORKERS_MAX
);
if
(
too_many
>
0
)
need_workers
-=
too_many
;
}
pthread_mutex_unlock
(
&
worker_count_mutex
);
/* UNLOCK */
/* Do we need a new worker (or two, or three...)?
* (are we allowed to create one?) */
while
(
need_workers
>
0
)
{
pthread_t
t
;
int
r
=
pthread_create
(
&
t
,
NULL
,
&
thread_main
,
NULL
);
if
(
r
==
0
)
{
pthread_detach
(
t
);
}
else
{
syslog
(
LOG_ERR
,
"could not start a new worker thread (not fatal)"
);
}
/* Even if we fail to create the new thread, keep going */
need_workers
--
;
}
}
nextlistener
:
/* Let another listener in */
pthread_mutex_lock
(
&
listener_mutex
);
assert
(
listener_lock
);
listener_lock
=
0
;
pthread_cond_signal
(
&
listener_cond
);
pthread_mutex_unlock
(
&
listener_mutex
);
/* Do work in this thread, if needed */
if
(
send_a_banner
)
{
dobanner
(
currConn
);
}
else
if
(
do_a_command
)
{
assert
(
currConn
);
if
(
docmd
(
currConn
)
==
DOCMD_CONN_FINISHED
)
{
conn_free
(
currConn
);
/* continue to top of loop here since we won't be adding
* this back to the idle list */
continue
;
}
/* Are we allowed to continue serving data? */
pthread_mutex_lock
(
&
ready_for_connections_mutex
);
/* LOCK */
if
(
!
ready_for_connections
)
{
pthread_mutex_unlock
(
&
ready_for_connections_mutex
);
/* UNLOCK */
prot_printf
(
C
->
pout
,
"* BYE
\"
no longer ready for connections
\"\r\n
"
);
conn_free
(
currConn
);
/* continue to top of loop here since we won't be adding
* this back to the idle list */
continue
;
}
pthread_mutex_unlock
(
&
ready_for_connections_mutex
);
/* UNLOCK */
}
/* done handling command */
if
(
send_a_banner
||
do_a_command
)
{
/* We did work in this thread, so we need to [re-]add the
* connection to the idle list and signal the current listener */
pthread_mutex_lock
(
&
idle_connlist_mutex
);
/* LOCK */
currConn
->
idle
=
1
;
currConn
->
next_idle
=
idle_connlist
;
idle_connlist
=
currConn
;
pthread_mutex_unlock
(
&
idle_connlist_mutex
);
/* UNLOCK */
/* Signal to our caller that we should add something
* to select() on, since this connection is ready again */
if
(
write
(
conn_pipe
[
1
],
&
NO_NEW_CONNECTION
,
sizeof
(
NO_NEW_CONNECTION
))
==
-1
)
{
fatal
(
"write to conn_pipe to signal docmd done failed"
,
EC_TEMPFAIL
);
}
}
}
/* while(1) */
worker_thread_done
:
/* Remove this worker from the pool */
/* Note that workers exiting the loop above should NOT be counted
* in the idle_worker_count */
pthread_mutex_lock
(
&
worker_count_mutex
);
/* LOCK */
worker_count
--
;
pthread_mutex_lock
(
&
idle_worker_mutex
);
/* LOCK */
syslog
(
LOG_DEBUG
,
"Worker thread finished, for a total of %d (%d spare)"
,
worker_count
,
idle_worker_count
);
pthread_mutex_unlock
(
&
idle_worker_mutex
);
/* UNLOCK */
pthread_mutex_unlock
(
&
worker_count_mutex
);
/* UNLOCK */
protgroup_free
(
protin
);
protgroup_free
(
protout
);
return
NULL
;
}
/* read from disk database must be unlocked. */
static
void
database_init
(
void
)
{
pthread_mutex_lock
(
&
mailboxes_mutex
);
/* LOCK */
mboxlist_init
(
0
);
mboxlist_open
(
NULL
);
pthread_mutex_unlock
(
&
mailboxes_mutex
);
/* UNLOCK */
}
/* log change to database. database must be locked. */
static
void
database_log
(
const
struct
mbent
*
mb
,
struct
txn
**
mytid
)
{
mbentry_t
*
mbentry
=
NULL
;
mbentry
=
mboxlist_entry_create
();
mbentry
->
name
=
mb
->
mailbox
;
mbentry
->
server
=
mb
->
server
;
mbentry
->
acl
=
mb
->
acl
;
switch
(
mb
->
t
)
{
case
SET_ACTIVE
:
mbentry
->
mbtype
=
0
;
mboxlist_insertremote
(
mbentry
,
mytid
);
break
;
case
SET_RESERVE
:
mbentry
->
mbtype
=
MBTYPE_RESERVE
;
mboxlist_insertremote
(
mbentry
,
mytid
);
break
;
case
SET_DELETE
:
mboxlist_deleteremote
(
mb
->
mailbox
,
mytid
);
break
;
case
SET_DEACTIVATE
:
/* SET_DEACTIVATE is not a real value that an actual
mailbox can have! */
abort
();
}
mboxlist_entry_free
(
&
mbentry
);
}
/* lookup in database. database must be locked */
/* This could probabally be more efficient and avoid some copies */
/* passing in a NULL pool implies that we should use regular xmalloc,
* a non-null pool implies we should use the mpool functionality */
static
struct
mbent
*
database_lookup
(
const
char
*
name
,
struct
mpool
*
pool
)
{
mbentry_t
*
mbentry
=
NULL
;
struct
mbent
*
out
;
char
*
server
=
NULL
;
int
r
;
if
(
!
name
)
return
NULL
;
r
=
mboxlist_lookup_allow_reserved
(
name
,
&
mbentry
,
NULL
);
if
(
r
)
return
NULL
;
if
(
mbentry
->
mbtype
&
MBTYPE_RESERVE
)
{
if
(
!
pool
)
out
=
xmalloc
(
sizeof
(
struct
mbent
)
+
1
);
else
out
=
mpool_malloc
(
pool
,
sizeof
(
struct
mbent
)
+
1
);
out
->
t
=
SET_RESERVE
;
out
->
acl
[
0
]
=
'\0'
;
}
else
{
if
(
!
pool
)
out
=
xmalloc
(
sizeof
(
struct
mbent
)
+
strlen
(
mbentry
->
acl
));
else
out
=
mpool_malloc
(
pool
,
sizeof
(
struct
mbent
)
+
strlen
(
mbentry
->
acl
));
out
->
t
=
SET_ACTIVE
;
strcpy
(
out
->
acl
,
mbentry
->
acl
);
}
if
(
mbentry
->
server
&&
mbentry
->
partition
)
server
=
strconcat
(
mbentry
->
server
,
"!"
,
mbentry
->
partition
,
NULL
);
else
if
(
mbentry
->
server
)
server
=
xstrdup
(
mbentry
->
server
);
else
if
(
mbentry
->
partition
)
server
=
xstrdup
(
mbentry
->
partition
);
else
server
=
xstrdup
(
""
);
out
->
mailbox
=
(
pool
)
?
mpool_strdup
(
pool
,
name
)
:
xstrdup
(
name
);
out
->
server
=
(
pool
)
?
mpool_strdup
(
pool
,
server
)
:
xstrdup
(
server
);
free
(
server
);
mboxlist_entry_free
(
&
mbentry
);
return
out
;
}
static
void
cmd_authenticate
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
mech
,
const
char
*
clientstart
)
{
int
r
,
sasl_result
;
const
void
*
val
;
int
failedloginpause
;
r
=
saslserver
(
C
->
saslconn
,
mech
,
clientstart
,
""
,
""
,
""
,
C
->
pin
,
C
->
pout
,
&
sasl_result
,
NULL
);
if
(
r
)
{
const
char
*
errorstring
=
NULL
;
switch
(
r
)
{
case
IMAP_SASL_CANCEL
:
prot_printf
(
C
->
pout
,
"%s NO Client canceled authentication
\r\n
"
,
tag
);
break
;
case
IMAP_SASL_PROTERR
:
errorstring
=
prot_error
(
C
->
pin
);
prot_printf
(
C
->
pout
,
"%s NO Error reading client response: %s
\r\n
"
,
tag
,
errorstring
?
errorstring
:
""
);
break
;
default
:
failedloginpause
=
config_getint
(
IMAPOPT_FAILEDLOGINPAUSE
);
if
(
failedloginpause
!=
0
)
{
sleep
(
failedloginpause
);
}
syslog
(
LOG_ERR
,
"badlogin: %s %s %s"
,
C
->
clienthost
,
mech
,
sasl_errdetail
(
C
->
saslconn
));
prot_printf
(
C
->
pout
,
"%s NO
\"
%s
\"\r\n
"
,
tag
,
sasl_errstring
((
r
==
SASL_NOUSER
?
SASL_BADAUTH
:
r
),
NULL
,
NULL
));
}
reset_saslconn
(
C
);
return
;
}
/* Successful Authentication */
r
=
sasl_getprop
(
C
->
saslconn
,
SASL_USERNAME
,
&
val
);
if
(
r
!=
SASL_OK
)
{
prot_printf
(
C
->
pout
,
"%s NO
\"
SASL Error
\"\r\n
"
,
tag
);
reset_saslconn
(
C
);
return
;
}
C
->
userid
=
(
char
*
)
val
;
syslog
(
LOG_NOTICE
,
"login: %s %s %s%s %s"
,
C
->
clienthost
,
C
->
userid
,
mech
,
C
->
tlsconn
?
"+TLS"
:
""
,
"User logged in"
);
prot_printf
(
C
->
pout
,
"%s OK
\"
Authenticated
\"\r\n
"
,
tag
);
prot_setsasl
(
C
->
pin
,
C
->
saslconn
);
prot_setsasl
(
C
->
pout
,
C
->
saslconn
);
C
->
logfd
=
telemetry_log
(
C
->
userid
,
C
->
pin
,
C
->
pout
,
1
);
return
;
}
/* Log the update out to anyone who is in our updatelist */
/* INVARIANT: caller MUST hold mailboxes_mutex */
/* oldserver is the previous value of the server in this update,
thisserver is the current value of the mailbox's server */
static
void
log_update
(
const
char
*
mailbox
,
const
char
*
oldserver
,
const
char
*
thisserver
)
{
struct
conn
*
upc
;
for
(
upc
=
updatelist
;
upc
!=
NULL
;
upc
=
upc
->
updatelist_next
)
{
/* for each connection, add to pending list */
struct
pending
*
p
=
(
struct
pending
*
)
xmalloc
(
sizeof
(
struct
pending
));
p
->
next
=
NULL
;
strlcpy
(
p
->
mailbox
,
mailbox
,
sizeof
(
p
->
mailbox
));
/* this might need to be inside the mutex, but I doubt it */
if
(
upc
->
streaming_hosts
&&
(
!
oldserver
||
strarray_find
(
upc
->
streaming_hosts
,
oldserver
,
0
)
<
0
)
&&
(
!
thisserver
||
strarray_find
(
upc
->
streaming_hosts
,
thisserver
,
0
)
<
0
))
{
/* No Match! Continue! */
continue
;
}
pthread_mutex_lock
(
&
upc
->
m
);
if
(
upc
->
plist
==
NULL
)
{
upc
->
plist
=
upc
->
ptail
=
p
;
}
else
{
upc
->
ptail
->
next
=
p
;
upc
->
ptail
=
p
;
}
pthread_mutex_unlock
(
&
upc
->
m
);
}
}
static
void
cmd_set
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
mailbox
,
const
char
*
server
,
const
char
*
acl
,
enum
settype
t
)
{
struct
mbent
*
m
;
char
*
oldserver
=
NULL
;
char
*
thisserver
=
NULL
;
char
*
tmp
;
/* Hold any output that we need to do */
enum
{
EXISTS
,
NOTACTIVE
,
DOESNTEXIST
,
ISOK
,
NOOUTPUT
}
msg
=
NOOUTPUT
;
syslog
(
LOG_DEBUG
,
"cmd_set(fd:%d, %s)"
,
C
->
fd
,
mailbox
);
pthread_mutex_lock
(
&
mailboxes_mutex
);
/* LOCK */
m
=
database_lookup
(
mailbox
,
NULL
);
if
(
m
&&
t
==
SET_RESERVE
)
{
if
(
config_mupdate_config
==
IMAP_ENUM_MUPDATE_CONFIG_STANDARD
)
{
/* failed; mailbox already exists */
msg
=
EXISTS
;
goto
done
;
}
/* otherwise do nothing (local create on master) */
}
if
((
!
m
||
m
->
t
!=
SET_ACTIVE
)
&&
t
==
SET_DEACTIVATE
)
{
/* failed; mailbox not currently active */
msg
=
NOTACTIVE
;
goto
done
;
}
else
if
(
t
==
SET_DEACTIVATE
)
{
t
=
SET_RESERVE
;
}
if
(
t
==
SET_DELETE
)
{
if
(
!
m
)
{
if
(
config_mupdate_config
==
IMAP_ENUM_MUPDATE_CONFIG_STANDARD
)
{
/* failed; mailbox doesn't exist */
msg
=
DOESNTEXIST
;
goto
done
;
}
/* otherwise do nothing (local delete on master) */
}
else
{
oldserver
=
xstrdup
(
m
->
server
);
/* do the deletion */
m
->
t
=
SET_DELETE
;
}
}
else
{
if
(
m
)
oldserver
=
m
->
server
;
if
(
m
&&
(
!
acl
||
strlen
(
acl
)
<
strlen
(
m
->
acl
)))
{
/* change what's already there -- the acl is smaller */
m
->
server
=
xstrdup
(
server
);
if
(
acl
)
strcpy
(
m
->
acl
,
acl
);
else
m
->
acl
[
0
]
=
'\0'
;
m
->
t
=
t
;
}
else
{
struct
mbent
*
newm
;
/* allocate new mailbox */
if
(
acl
)
{
newm
=
xrealloc
(
m
,
sizeof
(
struct
mbent
)
+
strlen
(
acl
));
}
else
{
newm
=
xrealloc
(
m
,
sizeof
(
struct
mbent
)
+
1
);
}
newm
->
mailbox
=
xstrdup
(
mailbox
);
newm
->
server
=
xstrdup
(
server
);
if
(
acl
)
{
strcpy
(
newm
->
acl
,
acl
);
}
else
{
newm
->
acl
[
0
]
=
'\0'
;
}
newm
->
t
=
t
;
/* re-scope */
m
=
newm
;
}
}
/* write to disk */
if
(
m
)
database_log
(
m
,
NULL
);
if
(
oldserver
)
{
tmp
=
strchr
(
oldserver
,
'!'
);
if
(
tmp
)
*
tmp
=
'\0'
;
}
if
(
server
)
{
thisserver
=
xstrdup
(
server
);
tmp
=
strchr
(
thisserver
,
'!'
);
if
(
tmp
)
*
tmp
=
'\0'
;
}
/* post pending changes */
log_update
(
mailbox
,
oldserver
,
thisserver
);
msg
=
ISOK
;
done
:
if
(
thisserver
)
free
(
thisserver
);
if
(
oldserver
)
free
(
oldserver
);
free_mbent
(
m
);
pthread_mutex_unlock
(
&
mailboxes_mutex
);
/* UNLOCK */
/* Delay output until here to avoid blocking while holding
* mailboxes_mutex */
switch
(
msg
)
{
case
EXISTS
:
prot_printf
(
C
->
pout
,
"%s NO
\"
mailbox already exists
\"\r\n
"
,
tag
);
break
;
case
NOTACTIVE
:
prot_printf
(
C
->
pout
,
"%s NO
\"
mailbox not currently active
\"\r\n
"
,
tag
);
break
;
case
DOESNTEXIST
:
prot_printf
(
C
->
pout
,
"%s NO
\"
mailbox doesn't exist
\"\r\n
"
,
tag
);
break
;
case
ISOK
:
prot_printf
(
C
->
pout
,
"%s OK
\"
done
\"\r\n
"
,
tag
);
break
;
default
:
break
;
}
}
static
void
cmd_find
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
mailbox
,
int
send_ok
,
int
send_delete
)
{
struct
mbent
*
m
;
syslog
(
LOG_DEBUG
,
"cmd_find(fd:%d, %s)"
,
C
->
fd
,
mailbox
);
/* Only hold the mutex around database_lookup,
* since the mbent stays valid even if the database changes,
* and we don't want to block on network I/O */
pthread_mutex_lock
(
&
mailboxes_mutex
);
/* LOCK */
m
=
database_lookup
(
mailbox
,
NULL
);
pthread_mutex_unlock
(
&
mailboxes_mutex
);
/* UNLOCK */
if
(
m
&&
m
->
t
==
SET_ACTIVE
)
{
prot_printf
(
C
->
pout
,
"%s MAILBOX {"
SIZE_T_FMT
"+}
\r\n
%s"
" {"
SIZE_T_FMT
"+}
\r\n
%s {"
SIZE_T_FMT
"+}
\r\n
%s
\r\n
"
,
tag
,
strlen
(
m
->
mailbox
),
m
->
mailbox
,
strlen
(
m
->
server
),
m
->
server
,
strlen
(
m
->
acl
),
m
->
acl
);
}
else
if
(
m
&&
m
->
t
==
SET_RESERVE
)
{
prot_printf
(
C
->
pout
,
"%s RESERVE {"
SIZE_T_FMT
"+}
\r\n
%s"
" {"
SIZE_T_FMT
"+}
\r\n
%s
\r\n
"
,
tag
,
strlen
(
m
->
mailbox
),
m
->
mailbox
,
strlen
(
m
->
server
),
m
->
server
);
}
else
if
(
send_delete
)
{
/* not found, if needed, send a delete */
prot_printf
(
C
->
pout
,
"%s DELETE {"
SIZE_T_FMT
"+}
\r\n
%s
\r\n
"
,
tag
,
strlen
(
mailbox
),
mailbox
);
}
free_mbent
(
m
);
if
(
send_ok
)
{
prot_printf
(
C
->
pout
,
"%s OK
\"
Search completed
\"\r\n
"
,
tag
);
}
}
/* Callback for cmd_startupdate to be passed to mboxlist_findall. */
/* Requires that C->streaming be set to the tag to respond with */
static
int
sendupdate
(
char
*
name
,
int
matchlen
__attribute__
((
unused
)),
int
maycreate
__attribute__
((
unused
)),
void
*
rock
)
{
struct
conn
*
C
=
(
struct
conn
*
)
rock
;
struct
mbent
*
m
;
char
*
server
=
NULL
;
if
(
!
C
)
return
-1
;
m
=
database_lookup
(
name
,
NULL
);
if
(
!
m
)
return
-1
;
if
(
!
C
->
list_prefix
||
!
strncmp
(
m
->
server
,
C
->
list_prefix
,
C
->
list_prefix_len
))
{
/* Either there is not a prefix to test, or we matched it */
char
*
tmp
;
if
(
C
->
streaming_hosts
)
{
server
=
xstrdup
(
m
->
server
);
tmp
=
strchr
(
server
,
'!'
);
if
(
tmp
)
*
tmp
=
'\0'
;
}
if
(
!
C
->
streaming_hosts
||
strarray_find
(
C
->
streaming_hosts
,
server
,
0
)
>=
0
)
{
switch
(
m
->
t
)
{
case
SET_ACTIVE
:
prot_printf
(
C
->
pout
,
"%s MAILBOX {"
SIZE_T_FMT
"+}
\r\n
%s"
" {"
SIZE_T_FMT
"+}
\r\n
%s {"
SIZE_T_FMT
"+}
\r\n
%s
\r\n
"
,
C
->
streaming
,
strlen
(
m
->
mailbox
),
m
->
mailbox
,
strlen
(
m
->
server
),
m
->
server
,
strlen
(
m
->
acl
),
m
->
acl
);
break
;
case
SET_RESERVE
:
prot_printf
(
C
->
pout
,
"%s RESERVE {"
SIZE_T_FMT
"+}
\r\n
%s"
" {"
SIZE_T_FMT
"+}
\r\n
%s
\r\n
"
,
C
->
streaming
,
strlen
(
m
->
mailbox
),
m
->
mailbox
,
strlen
(
m
->
server
),
m
->
server
);
break
;
case
SET_DELETE
:
/* deleted item in the list !?! */
case
SET_DEACTIVATE
:
/* SET_DEACTIVATE is not a real value! */
abort
();
}
}
}
if
(
server
)
free
(
server
);
free_mbent
(
m
);
return
0
;
}
static
void
cmd_list
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
host_prefix
)
{
char
pattern
[
2
]
=
{
'*'
,
'\0'
};
/* List operations can result in a lot of output, let's do this
* with the prot layer nonblocking so we don't hold the mutex forever*/
prot_NONBLOCK
(
C
->
pout
);
/* indicate interest in updates */
pthread_mutex_lock
(
&
mailboxes_mutex
);
/* LOCK */
/* since this isn't valid when streaming, just use the same callback */
C
->
streaming
=
tag
;
C
->
list_prefix
=
host_prefix
;
if
(
C
->
list_prefix
)
C
->
list_prefix_len
=
strlen
(
C
->
list_prefix
);
else
C
->
list_prefix_len
=
0
;
mboxlist_findall
(
NULL
,
pattern
,
1
,
NULL
,
NULL
,
sendupdate
,
(
void
*
)
C
);
C
->
streaming
=
NULL
;
C
->
list_prefix
=
NULL
;
C
->
list_prefix_len
=
0
;
pthread_mutex_unlock
(
&
mailboxes_mutex
);
/* UNLOCK */
prot_BLOCK
(
C
->
pout
);
prot_flush
(
C
->
pout
);
}
/*
* we've registered this connection for streaming, and every X seconds
* this will be invoked. note that we always send out updates as soon
* as we get a noop: that resets this counter back */
static
struct
prot_waitevent
*
sendupdates_evt
(
struct
protstream
*
s
__attribute__
((
unused
)),
struct
prot_waitevent
*
ev
,
void
*
rock
)
{
struct
conn
*
C
=
(
struct
conn
*
)
rock
;
sendupdates
(
C
,
1
);
/* 'sendupdates()' will update when we next trigger */
return
ev
;
}
static
void
cmd_startupdate
(
struct
conn
*
C
,
const
char
*
tag
,
strarray_t
*
partial
)
{
char
pattern
[
2
]
=
{
'*'
,
'\0'
};
/* initialize my condition variable */
/* The inital dump of the database can result in a lot of data,
* let's do this nonblocking */
prot_NONBLOCK
(
C
->
pout
);
/* indicate interest in updates */
pthread_mutex_lock
(
&
mailboxes_mutex
);
/* LOCK */
C
->
updatelist_next
=
updatelist
;
updatelist
=
C
;
C
->
streaming
=
xstrdup
(
tag
);
C
->
streaming_hosts
=
partial
;
/* dump initial list */
mboxlist_findall
(
NULL
,
pattern
,
1
,
NULL
,
NULL
,
sendupdate
,
(
void
*
)
C
);
pthread_mutex_unlock
(
&
mailboxes_mutex
);
/* UNLOCK */
prot_printf
(
C
->
pout
,
"%s OK
\"
streaming starts
\"\r\n
"
,
tag
);
prot_BLOCK
(
C
->
pout
);
prot_flush
(
C
->
pout
);
/* schedule our first update */
C
->
ev
=
prot_addwaitevent
(
C
->
pin
,
time
(
NULL
)
+
update_wait
,
sendupdates_evt
,
C
);
}
/* send out any pending updates.
if 'flushnow' is set, flush the output buffer */
static
void
sendupdates
(
struct
conn
*
C
,
int
flushnow
)
{
struct
pending
*
p
,
*
q
;
pthread_mutex_lock
(
&
C
->
m
);
/* just grab the update list and release the lock */
p
=
C
->
plist
;
C
->
plist
=
NULL
;
C
->
ptail
=
NULL
;
pthread_mutex_unlock
(
&
C
->
m
);
while
(
p
!=
NULL
)
{
/* send update */
q
=
p
;
p
=
p
->
next
;
/* notify just like a FIND - except enable sending of DELETE
* notifications */
cmd_find
(
C
,
C
->
streaming
,
q
->
mailbox
,
0
,
1
);
free
(
q
);
}
/* reschedule event for 'update_wait' seconds */
C
->
ev
->
mark
=
time
(
NULL
)
+
update_wait
;
if
(
flushnow
)
{
prot_flush
(
C
->
pout
);
}
}
#ifdef HAVE_SSL
static
void
cmd_starttls
(
struct
conn
*
C
,
const
char
*
tag
)
{
int
result
;
int
*
layerp
;
char
*
auth_id
;
sasl_ssf_t
ssf
;
/* SASL and openssl have different ideas about whether ssf is signed */
layerp
=
(
int
*
)
&
ssf
;
result
=
tls_init_serverengine
(
"mupdate"
,
5
,
/* depth to verify */
1
,
/* can client auth? */
1
);
/* TLS only? */
if
(
result
==
-1
)
{
syslog
(
LOG_ERR
,
"error initializing TLS"
);
prot_printf
(
C
->
pout
,
"%s NO Error initializing TLS
\r\n
"
,
tag
);
return
;
}
prot_printf
(
C
->
pout
,
"%s OK Begin TLS negotiation now
\r\n
"
,
tag
);
/* must flush our buffers before starting tls */
prot_flush
(
C
->
pout
);
result
=
tls_start_servertls
(
C
->
pin
->
fd
,
/* read */
C
->
pout
->
fd
,
/* write */
180
,
/* 3 minutes */
layerp
,
&
auth_id
,
&
C
->
tlsconn
);
/* if error */
if
(
result
==
-1
)
{
prot_printf
(
C
->
pout
,
"%s NO Starttls negotiation failed
\r\n
"
,
tag
);
syslog
(
LOG_NOTICE
,
"STARTTLS negotiation failed: %s"
,
C
->
clienthost
);
return
;
}
/* tell SASL about the negotiated layer */
result
=
sasl_setprop
(
C
->
saslconn
,
SASL_SSF_EXTERNAL
,
&
ssf
);
if
(
result
!=
SASL_OK
)
{
fatal
(
"sasl_setprop() failed: cmd_starttls()"
,
EC_TEMPFAIL
);
}
C
->
saslprops
.
ssf
=
ssf
;
result
=
sasl_setprop
(
C
->
saslconn
,
SASL_AUTH_EXTERNAL
,
auth_id
);
if
(
result
!=
SASL_OK
)
{
fatal
(
"sasl_setprop() failed: cmd_starttls()"
,
EC_TEMPFAIL
);
}
if
(
C
->
saslprops
.
authid
)
{
free
(
C
->
saslprops
.
authid
);
C
->
saslprops
.
authid
=
NULL
;
}
if
(
auth_id
)
C
->
saslprops
.
authid
=
xstrdup
(
auth_id
);
/* tell the prot layer about our new layers */
prot_settls
(
C
->
pin
,
C
->
tlsconn
);
prot_settls
(
C
->
pout
,
C
->
tlsconn
);
#if (OPENSSL_VERSION_NUMBER >= 0x0090800fL)
C
->
tls_comp
=
(
void
*
)
SSL_get_current_compression
(
C
->
tlsconn
);
#endif
/* Reissue capability banner */
dobanner
(
C
);
}
#else
void
cmd_starttls
(
struct
conn
*
C
,
const
char
*
tag
)
{
fatal
(
"cmd_starttls() executed, but starttls isn't implemented!"
,
EC_SOFTWARE
);
}
#endif
/* HAVE_SSL */
#ifdef HAVE_ZLIB
static
void
cmd_compress
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
alg
)
{
if
(
C
->
compress_done
)
{
prot_printf
(
C
->
pout
,
"%s BAD DEFLATE active via COMPRESS
\r\n
"
,
tag
);
}
#if defined(HAVE_SSL) && (OPENSSL_VERSION_NUMBER >= 0x0090800fL)
else
if
(
C
->
tls_comp
)
{
prot_printf
(
C
->
pout
,
"%s NO %s active via TLS
\r\n
"
,
tag
,
SSL_COMP_get_name
(
C
->
tls_comp
));
}
#endif
else
if
(
strcasecmp
(
alg
,
"DEFLATE"
))
{
prot_printf
(
C
->
pout
,
"%s NO Unknown COMPRESS algorithm: %s
\r\n
"
,
tag
,
alg
);
}
else
if
(
ZLIB_VERSION
[
0
]
!=
zlibVersion
()[
0
])
{
prot_printf
(
C
->
pout
,
"%s NO Error initializing %s (incompatible zlib version)
\r\n
"
,
tag
,
alg
);
}
else
{
prot_printf
(
C
->
pout
,
"%s OK %s active
\r\n
"
,
tag
,
alg
);
/* enable (de)compression for the prot layer */
prot_setcompress
(
C
->
pin
);
prot_setcompress
(
C
->
pout
);
C
->
compress_done
=
1
;
}
}
#else
void
cmd_compress
(
struct
conn
*
C
,
const
char
*
tag
,
const
char
*
alg
)
{
fatal
(
"cmd_compress() executed, but COMPRESS isn't implemented!"
,
EC_SOFTWARE
);
}
#endif
/* HAVE_ZLIB */
void
shut_down
(
int
code
)
__attribute__
((
noreturn
));
void
shut_down
(
int
code
)
{
in_shutdown
=
1
;
cyrus_done
();
exit
(
code
);
}
/* Reset the given sasl_conn_t to a sane state */
static
int
reset_saslconn
(
struct
conn
*
c
)
{
int
ret
;
sasl_security_properties_t
*
secprops
=
NULL
;
sasl_dispose
(
&
c
->
saslconn
);
/* do initialization typical of service_main */
ret
=
sasl_server_new
(
"mupdate"
,
config_servername
,
NULL
,
NULL
,
NULL
,
NULL
,
0
,
&
c
->
saslconn
);
if
(
ret
!=
SASL_OK
)
return
ret
;
if
(
c
->
saslprops
.
ipremoteport
)
ret
=
sasl_setprop
(
c
->
saslconn
,
SASL_IPREMOTEPORT
,
c
->
saslprops
.
ipremoteport
);
if
(
ret
!=
SASL_OK
)
return
ret
;
if
(
c
->
saslprops
.
iplocalport
)
ret
=
sasl_setprop
(
c
->
saslconn
,
SASL_IPLOCALPORT
,
c
->
saslprops
.
iplocalport
);
if
(
ret
!=
SASL_OK
)
return
ret
;
secprops
=
mysasl_secprops
(
SASL_SEC_NOANONYMOUS
);
ret
=
sasl_setprop
(
c
->
saslconn
,
SASL_SEC_PROPS
,
secprops
);
if
(
ret
!=
SASL_OK
)
return
ret
;
/* end of service_main initialization excepting SSF */
/* If we have TLS/SSL info, set it */
if
(
c
->
saslprops
.
ssf
)
{
ret
=
sasl_setprop
(
c
->
saslconn
,
SASL_SSF_EXTERNAL
,
&
c
->
saslprops
.
ssf
);
}
if
(
ret
!=
SASL_OK
)
return
ret
;
if
(
c
->
saslprops
.
authid
)
{
ret
=
sasl_setprop
(
c
->
saslconn
,
SASL_AUTH_EXTERNAL
,
c
->
saslprops
.
authid
);
if
(
ret
!=
SASL_OK
)
return
ret
;
}
/* End TLS/SSL Info */
return
SASL_OK
;
}
int
cmd_change
(
struct
mupdate_mailboxdata
*
mdata
,
const
char
*
rock
,
void
*
context
__attribute__
((
unused
)))
{
struct
mbent
*
m
=
NULL
;
char
*
oldserver
=
NULL
;
char
*
thisserver
=
NULL
;
char
*
tmp
;
enum
settype
t
=
-1
;
int
ret
=
0
;
if
(
!
mdata
||
!
rock
||
!
mdata
->
mailbox
)
return
1
;
pthread_mutex_lock
(
&
mailboxes_mutex
);
/* LOCK */
if
(
!
strncmp
(
rock
,
"DELETE"
,
6
))
{
m
=
database_lookup
(
mdata
->
mailbox
,
NULL
);
if
(
!
m
)
{
syslog
(
LOG_DEBUG
,
"attempt to delete unknown mailbox %s"
,
mdata
->
mailbox
);
/* Mailbox doesn't exist - this isn't as fatal as you might
* think. */
/* ret = -1; */
goto
done
;
}
m
->
t
=
t
=
SET_DELETE
;
oldserver
=
xstrdup
(
m
->
server
);
}
else
{
m
=
database_lookup
(
mdata
->
mailbox
,
NULL
);
if
(
m
)
oldserver
=
m
->
server
;
if
(
m
&&
(
!
mdata
->
acl
||
strlen
(
mdata
->
acl
)
<
strlen
(
m
->
acl
)))
{
/* change what's already there */
/* old m->server freed when oldserver is freed */
m
->
server
=
xstrdup
(
mdata
->
server
);
if
(
mdata
->
acl
)
strcpy
(
m
->
acl
,
mdata
->
acl
);
else
m
->
acl
[
0
]
=
'\0'
;
if
(
!
strncmp
(
rock
,
"MAILBOX"
,
6
))
{
m
->
t
=
t
=
SET_ACTIVE
;
}
else
if
(
!
strncmp
(
rock
,
"RESERVE"
,
7
))
{
m
->
t
=
t
=
SET_RESERVE
;
}
else
{
syslog
(
LOG_DEBUG
,
"bad mupdate command in cmd_change: %s"
,
rock
);
ret
=
1
;
goto
done
;
}
}
else
{
struct
mbent
*
newm
;
if
(
m
)
{
free
(
m
->
mailbox
);
/* m->server freed when oldserver freed */
}
/* allocate new mailbox */
if
(
mdata
->
acl
)
{
newm
=
xrealloc
(
m
,
sizeof
(
struct
mbent
)
+
strlen
(
mdata
->
acl
));
}
else
{
newm
=
xrealloc
(
m
,
sizeof
(
struct
mbent
)
+
1
);
}
newm
->
mailbox
=
xstrdup
(
mdata
->
mailbox
);
newm
->
server
=
xstrdup
(
mdata
->
server
);
if
(
mdata
->
acl
)
{
strcpy
(
newm
->
acl
,
mdata
->
acl
);
}
else
{
newm
->
acl
[
0
]
=
'\0'
;
}
if
(
!
strncmp
(
rock
,
"MAILBOX"
,
6
))
{
newm
->
t
=
t
=
SET_ACTIVE
;
}
else
if
(
!
strncmp
(
rock
,
"RESERVE"
,
7
))
{
newm
->
t
=
t
=
SET_RESERVE
;
}
else
{
syslog
(
LOG_DEBUG
,
"bad mupdate command in cmd_change: %s"
,
rock
);
ret
=
1
;
goto
done
;
}
/* Bring it back into scope */
m
=
newm
;
}
}
/* write to disk */
database_log
(
m
,
NULL
);
if
(
oldserver
)
{
tmp
=
strchr
(
oldserver
,
'!'
);
if
(
tmp
)
*
tmp
=
'\0'
;
}
if
(
mdata
->
server
)
{
thisserver
=
xstrdup
(
mdata
->
server
);
tmp
=
strchr
(
thisserver
,
'!'
);
if
(
tmp
)
*
tmp
=
'\0'
;
}
/* post pending changes to anyone we are talking to */
log_update
(
mdata
->
mailbox
,
oldserver
,
thisserver
);
done
:
if
(
oldserver
)
free
(
oldserver
);
if
(
thisserver
)
free
(
thisserver
);
free_mbent
(
m
);
pthread_mutex_unlock
(
&
mailboxes_mutex
);
/* UNLOCK */
return
ret
;
}
struct
sync_rock
{
struct
mpool
*
pool
;
struct
mbent_queue
*
boxes
;
};
/* Read a series of MAILBOX and RESERVE commands and tack them onto a
* queue */
static
int
cmd_resync
(
struct
mupdate_mailboxdata
*
mdata
,
const
char
*
rock
,
void
*
context
)
{
struct
sync_rock
*
r
=
(
struct
sync_rock
*
)
context
;
struct
mbent_queue
*
remote_boxes
=
r
->
boxes
;
struct
mbent
*
newm
=
NULL
;
if
(
!
mdata
||
!
rock
||
!
mdata
->
mailbox
||
!
remote_boxes
)
return
1
;
/* allocate new mailbox */
if
(
mdata
->
acl
)
{
newm
=
mpool_malloc
(
r
->
pool
,
sizeof
(
struct
mbent
)
+
strlen
(
mdata
->
acl
));
}
else
{
newm
=
mpool_malloc
(
r
->
pool
,
sizeof
(
struct
mbent
)
+
1
);
}
newm
->
mailbox
=
mpool_strdup
(
r
->
pool
,
mdata
->
mailbox
);
newm
->
server
=
mpool_strdup
(
r
->
pool
,
mdata
->
server
);
if
(
mdata
->
acl
)
{
strcpy
(
newm
->
acl
,
mdata
->
acl
);
}
else
{
newm
->
acl
[
0
]
=
'\0'
;
}
if
(
!
strncmp
(
rock
,
"MAILBOX"
,
6
))
{
newm
->
t
=
SET_ACTIVE
;
}
else
if
(
!
strncmp
(
rock
,
"RESERVE"
,
7
))
{
newm
->
t
=
SET_RESERVE
;
}
else
{
syslog
(
LOG_NOTICE
,
"bad mupdate command in cmd_resync: %s"
,
rock
);
return
1
;
}
/* Insert onto queue */
newm
->
next
=
NULL
;
*
(
remote_boxes
->
tail
)
=
newm
;
remote_boxes
->
tail
=
&
(
newm
->
next
);
return
0
;
}
/* Callback for mupdate_synchronize to be passed to mboxlist_findall. */
static
int
sync_findall_cb
(
char
*
name
,
int
matchlen
__attribute__
((
unused
)),
int
maycreate
__attribute__
((
unused
)),
void
*
rock
)
{
struct
sync_rock
*
r
=
(
struct
sync_rock
*
)
rock
;
struct
mbent_queue
*
local_boxes
=
(
struct
mbent_queue
*
)
r
->
boxes
;
struct
mbent
*
m
;
if
(
!
local_boxes
)
return
1
;
m
=
database_lookup
(
name
,
r
->
pool
);
/* If it doesn't exist, fine... */
if
(
!
m
)
return
0
;
m
->
next
=
NULL
;
*
(
local_boxes
->
tail
)
=
m
;
local_boxes
->
tail
=
&
(
m
->
next
);
return
0
;
}
int
mupdate_synchronize_remote
(
mupdate_handle
*
handle
,
struct
mbent_queue
*
remote_boxes
,
struct
mpool
*
pool
)
{
struct
sync_rock
rock
;
if
(
!
handle
||
!
handle
->
saslcompleted
)
return
1
;
rock
.
pool
=
pool
;
/* ask mupdate master for updates and set nonblocking */
prot_printf
(
handle
->
conn
->
out
,
"U01 UPDATE
\r\n
"
);
syslog
(
LOG_NOTICE
,
"scarfing mailbox list from master mupdate server"
);
remote_boxes
->
head
=
NULL
;
remote_boxes
->
tail
=
&
(
remote_boxes
->
head
);
rock
.
boxes
=
remote_boxes
;
/* If there is a fatal error, return, other errors ignore */
if
(
mupdate_scarf
(
handle
,
cmd_resync
,
&
rock
,
1
,
NULL
)
!=
0
)
{
struct
mbent
*
p
=
remote_boxes
->
head
,
*
p_next
=
NULL
;
while
(
p
)
{
p_next
=
p
->
next
;
p
=
p_next
;
}
return
1
;
}
/* Make socket nonblocking now */
prot_NONBLOCK
(
handle
->
conn
->
in
);
return
0
;
}
int
mupdate_synchronize
(
struct
mbent_queue
*
remote_boxes
,
struct
mpool
*
pool
)
{
struct
mbent_queue
local_boxes
;
struct
mbent
*
l
,
*
r
;
struct
sync_rock
rock
;
char
pattern
[]
=
{
'*'
,
'\0'
};
struct
txn
*
tid
=
NULL
;
int
ret
=
0
;
int
err
=
0
;
rock
.
pool
=
pool
;
/* Note that this prevents other people from running an UPDATE against
* us for the duration. this is a GOOD THING */
pthread_mutex_lock
(
&
mailboxes_mutex
);
/* LOCK */
syslog
(
LOG_NOTICE
,
"synchronizing mailbox list with master mupdate server"
);
local_boxes
.
head
=
NULL
;
local_boxes
.
tail
=
&
(
local_boxes
.
head
);
rock
.
boxes
=
&
local_boxes
;
mboxlist_findall
(
NULL
,
pattern
,
1
,
NULL
,
NULL
,
sync_findall_cb
,
(
void
*
)
&
rock
);
/* Traverse both lists, compare the names */
/* If they match, ensure that server and acl are correct, if so,
move on, if not, fix them */
/* If the local is before the next remote, delete it */
/* If the next remote is before theis local, insert it and try again */
for
(
l
=
local_boxes
.
head
,
r
=
remote_boxes
->
head
;
l
&&
r
;
l
=
local_boxes
.
head
,
r
=
remote_boxes
->
head
)
{
int
ret
=
strcmp
(
l
->
mailbox
,
r
->
mailbox
);
if
(
!
ret
)
{
/* Match */
if
(
l
->
t
!=
r
->
t
||
strcmp
(
l
->
server
,
r
->
server
)
||
strcmp
(
l
->
acl
,
r
->
acl
))
{
/* Something didn't match, replace it */
/*
* If this is a locally hosted mailbox, don't make a
* change, just warn.
*/
if
((
config_mupdate_config
==
IMAP_ENUM_MUPDATE_CONFIG_UNIFIED
)
&&
(
strchr
(
l
->
server
,
'!'
)
==
NULL
))
{
syslog
(
LOG_ERR
,
"local mailbox %s wrong in mailbox list"
,
l
->
mailbox
);
err
++
;
}
else
{
mbentry_t
*
mbentry
=
mboxlist_entry_create
();
mbentry
->
name
=
r
->
mailbox
;
mbentry
->
mbtype
=
(
r
->
t
==
SET_RESERVE
?
MBTYPE_RESERVE
:
0
);
mbentry
->
server
=
r
->
server
;
mbentry
->
acl
=
r
->
acl
;
mboxlist_insertremote
(
mbentry
,
&
tid
);
mboxlist_entry_free
(
&
mbentry
);
}
}
/* Okay, dump these two */
local_boxes
.
head
=
l
->
next
;
remote_boxes
->
head
=
r
->
next
;
}
else
if
(
ret
<
0
)
{
/* Local without corresponding remote, delete it */
/*
* In a unified murder, we don't want to delete locally
* hosted mailboxes during mupdate's resync process.
* If that sort of operation appears necessary, it
* probably requires an operator to review it --
* ctl_mboxlist is the right place to fix the kind
* of configuration error implied.
*
* A similar problem exists when the server thinks
* it is locally hosting a mailbox, but mupdate master
* thinks it's somewhere else.
*/
if
((
config_mupdate_config
==
IMAP_ENUM_MUPDATE_CONFIG_UNIFIED
)
&&
(
strchr
(
l
->
server
,
'!'
)
==
NULL
))
{
syslog
(
LOG_ERR
,
"local mailbox %s not in mailbox list"
,
l
->
mailbox
);
err
++
;
}
else
{
mboxlist_deleteremote
(
l
->
mailbox
,
&
tid
);
}
local_boxes
.
head
=
l
->
next
;
}
else
/* (ret > 0) */
{
/* Remote without corresponding local, insert it */
mbentry_t
*
mbentry
=
mboxlist_entry_create
();
mbentry
->
name
=
r
->
mailbox
;
mbentry
->
mbtype
=
(
r
->
t
==
SET_RESERVE
?
MBTYPE_RESERVE
:
0
);
mbentry
->
server
=
r
->
server
;
mbentry
->
acl
=
r
->
acl
;
mboxlist_insertremote
(
mbentry
,
&
tid
);
mboxlist_entry_free
(
&
mbentry
);
remote_boxes
->
head
=
r
->
next
;
}
}
if
(
l
&&
!
r
)
{
/* we have more deletes to do */
while
(
l
)
{
if
((
config_mupdate_config
==
IMAP_ENUM_MUPDATE_CONFIG_UNIFIED
)
&&
(
strchr
(
l
->
server
,
'!'
)
==
NULL
))
{
syslog
(
LOG_ERR
,
"local mailbox %s not in mailbox list"
,
l
->
mailbox
);
err
++
;
}
else
{
mboxlist_deleteremote
(
l
->
mailbox
,
&
tid
);
}
local_boxes
.
head
=
l
->
next
;
l
=
local_boxes
.
head
;
}
}
else
if
(
r
&&
!
l
)
{
/* we have more inserts to do */
while
(
r
)
{
mbentry_t
*
mbentry
=
mboxlist_entry_create
();
mbentry
->
name
=
r
->
mailbox
;
mbentry
->
mbtype
=
(
r
->
t
==
SET_RESERVE
?
MBTYPE_RESERVE
:
0
);
mbentry
->
server
=
r
->
server
;
mbentry
->
acl
=
r
->
acl
;
mboxlist_insertremote
(
mbentry
,
&
tid
);
mboxlist_entry_free
(
&
mbentry
);
remote_boxes
->
head
=
r
->
next
;
r
=
remote_boxes
->
head
;
}
}
if
(
tid
)
mboxlist_commit
(
tid
);
/* All up to date! */
if
(
err
)
{
syslog
(
LOG_ERR
,
"mailbox list synchronization NOT complete (%d) errors"
,
err
);
}
else
{
syslog
(
LOG_NOTICE
,
"mailbox list synchronization complete"
);
}
pthread_mutex_unlock
(
&
mailboxes_mutex
);
/* UNLOCK */
return
ret
;
}
void
mupdate_signal_db_synced
(
void
)
{
pthread_mutex_lock
(
&
synced_mutex
);
synced
=
1
;
pthread_cond_broadcast
(
&
synced_cond
);
pthread_mutex_unlock
(
&
synced_mutex
);
}
void
mupdate_ready
(
void
)
{
pthread_mutex_lock
(
&
ready_for_connections_mutex
);
if
(
ready_for_connections
)
{
syslog
(
LOG_CRIT
,
"mupdate_ready called when already ready"
);
fatal
(
"mupdate_ready called when already ready"
,
EC_TEMPFAIL
);
}
ready_for_connections
=
1
;
pthread_cond_broadcast
(
&
ready_for_connections_cond
);
pthread_mutex_unlock
(
&
ready_for_connections_mutex
);
}
/* Signal unreadyness. Next active worker will kill off all idle connections.
* any non-idle connection will die off when it leaves docmd() */
void
mupdate_unready
(
void
)
{
pthread_mutex_lock
(
&
ready_for_connections_mutex
);
syslog
(
LOG_NOTICE
,
"unready for connections"
);
ready_for_connections
=
0
;
pthread_mutex_unlock
(
&
ready_for_connections_mutex
);
}
/* Used to free malloc'd mbent's (not for mpool'd mbents) */
void
free_mbent
(
struct
mbent
*
p
)
{
if
(
!
p
)
return
;
free
(
p
->
server
);
free
(
p
->
mailbox
);
free
(
p
);
}
File Metadata
Details
Attached
Mime Type
text/x-c
Expires
Sat, Apr 4, 2:04 AM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18822074
Default Alt Text
mupdate.c (66 KB)
Attached To
Mode
R111 cyrus-imapd
Attached
Detach File
Event Timeline