define('QUIRK_CHUNK_UPDATE', 0x0001);
-echo "Moodle chat daemon v1.0 (\$Id$)\n\n";
+echo "Moodle chat daemon v1.0 on PHP ".phpversion()." (\$Id$)\n\n";
/// Set up all the variables we need /////////////////////////////////////
}
class ChatDaemon {
- var $_readytogo = false;
- var $_logfile = false;
- var $_trace_to_console = true;
- var $_trace_to_stdout = true;
- var $_logfile_name = 'chatd.log';
-
- var $conn_ufo = array(); // Connections not identified yet
+ var $_resetsocket = false;
+ var $_readytogo = false;
+ var $_logfile = false;
+ var $_trace_to_console = true;
+ var $_trace_to_stdout = true;
+ var $_logfile_name = 'chatd.log';
+ var $_last_idle_poll = 0;
+
+ var $conn_ufo = array(); // Connections not identified yet
var $conn_side = array(); // Sessions with sidekicks waiting for the main connection to be processed
var $conn_half = array(); // Sessions that have valid connections but not all of them
var $conn_sets = array(); // Sessions with complete connection sets sets
var $sets_info = array(); // Keyed by sessionid exactly like conn_sets, one of these for each of those
-
- var $message_queue = array(); // Holds messages that we haven't committed to the DB yet
+ var $chatrooms = array(); // Keyed by chatid, holding arrays of data
function ChatDaemon() {
- // Check the STDOUT constant
- $this->_trace_to_stdout = defined('STDOUT');
$this->_trace_level = E_ALL ^ E_USER_NOTICE;
$this->_pcntl_exists = function_exists('pcntl_fork');
$this->_time_rest_socket = 20;
$this->_beepsoundsrc = $GLOBALS['CFG']->wwwroot.'/mod/chat/beep.wav';
$this->_freq_update_records = 15;
+ $this->_freq_poll_idle_chat = 35;
+ $this->_stdout = fopen('php://stdout', 'w');
+ if($this->_stdout) {
+ // Avoid double traces for everything
+ $this->_trace_to_console = false;
+ }
+ }
+
+ function poll_idle_chats($now) {
+ $this->trace('Polling chats to detect disconnected users');
+ if(!empty($this->chatrooms)) {
+ foreach($this->chatrooms as $chatid => $chatroom) {
+ if(!empty($chatroom['users'])) {
+ foreach($chatroom['users'] as $sessionid => $userid) {
+ // We will be polling each user as required
+ if($this->sets_info[$sessionid]['chatuser']->lastmessageping < $this->_last_idle_poll) {
+ // This user hasn't been polled since his last message
+ if($this->write_data($this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL], '<!-- poll -->') === false) {
+ // User appears to have disconnected
+ $this->disconnect_session($sessionid);
+ }
+ }
+ }
+ }
+ }
+ }
+ $this->_last_idle_poll = $now;
}
function query_start() {
// Emit the message to wherever we should
if($this->_trace_to_stdout) {
- fwrite(STDOUT, $message);
+ fwrite($this->_stdout, $message);
+ fflush($this->_stdout);
}
if($this->_trace_to_console) {
echo $message;
}
}
+ function write_data($connection, $text) {
+ $written = @socket_write($connection, $text, strlen($text));
+ if($written === false) {
+ // $this->trace("socket_write() failed: reason: " . socket_strerror(socket_last_error($connection)));
+ return false;
+ }
+ return true;
+
+ // Enclosing the above code inside this blocks makes sure that
+ // "a socket write operation will not block". I 'm not so sure
+ // if this is needed, as we have a nonblocking socket anyway.
+ // If trouble starts to creep up, we 'll restore this.
+// $check_socket = array($connection);
+// $socket_changed = socket_select($read = NULL, $check_socket, $except = NULL, 0, 0);
+// if($socket_changed > 0) {
+//
+// // ABOVE CODE GOES HERE
+//
+// }
+// return false;
+ }
+
function update_lastmessageping($sessionid, $time = NULL) {
// TODO: this can and should be written as a single UPDATE query
if(empty($this->sets_info[$sessionid])) {
return false;
}
- $now = time();
if(empty($time)) {
- $time = $now;
+ $time = time();
}
// We 'll be cheating a little, and NOT updating lastmessageping
$this->sets_info[$sessionid]['chatuser']->lastping = $time;
// This will set it just fine for bookkeeping purposes.
- if($now - $this->sets_info[$sessionid]['lastinfocommit'] > $this->_freq_update_records) {
+ if($time - $this->sets_info[$sessionid]['lastinfocommit'] > $this->_freq_update_records) {
// commit to permanent storage
// $this->trace('Committing volatile lastmessageping for session '.$sessionid);
- $this->sets_info[$sessionid]['lastinfocommit'] = $now;
+ $this->sets_info[$sessionid]['lastinfocommit'] = $time;
update_record('chat_users', $this->sets_info[$sessionid]['chatuser']);
}
return true;
$header .= "\n";
// That's enough headers for one lousy dummy response
- chat_socket_write($handle, $header);
+ $this->write_data($handle, $header);
// All done
break;
$header .= "Cache-Control: no-cache, must-revalidate\n";
$header .= "Expires: Wed, 4 Oct 1978 09:32:45 GMT\n";
$header .= "Content-Length: ".strlen($content)."\n";
- $header .= "Refresh: $CFG->chat_refresh_userlist; URL=http://$CFG->chat_serverhost:$CFG->chat_serverport/?win=users&".
+
+ // The refresh value is 2 seconds higher than the configuration variable because we are doing JS refreshes all the time.
+ // However, if the JS doesn't work for some reason, we still want to refresh once in a while.
+ $header .= "Refresh: ".(intval($CFG->chat_refresh_userlist) + 2)."; URL=http://$CFG->chat_serverhost:$CFG->chat_serverport/?win=users&".
"chat_sid=".$sessionid."&groupid=".$this->sets_info[$sessionid]['groupid']."\n";
$header .= "\n";
// That's enough headers for one lousy dummy response
$this->trace('writing users http response to handle '.$handle);
- chat_socket_write($handle, $header . $content);
+ $this->write_data($handle, $header . $content);
/*
$header = "HTTP/1.1 200 OK\n";
$header .= "Expires: Wed, 4 Oct 1978 09:32:45 GMT\n";
$header .= "\n";
$this->trace('writing users http response to handle '.$handle);
- chat_socket_write($handle, $header);
+ $this->write_data($handle, $header);
*/
break;
case CHAT_SIDEKICK_MESSAGE:
if($this->sets_info[$sessionid]['lastmessageindex'] >= $messageindex) {
// We have already broadcasted that!
- $this->trace('discarding message with stale index');
+ // $this->trace('discarding message with stale index');
break;
}
else {
$header .= "\n";
// That's enough headers for one lousy dummy response
- chat_socket_write($handle, $header);
+ $this->write_data($handle, $header);
// All done
break;
'lastmessageindex' => 0,
'courseid' => $course->id,
'chatuser' => $chatuser,
- 'chatid' => $chatuser->chatid,
+ 'chatid' => $chat->id,
'user' => $user,
- 'userid' => $chatuser->userid,
+ 'userid' => $user->id,
'groupid' => $groupid,
'lang' => $lang,
'quirks' => $customdata['quirks']
);
- $this->trace('QUIRKS value for this connection is '.$customdata['quirks']);
+ // If we know nothing about this chatroom, initialize it and add the user
+ if(!isset($this->chatrooms[$chat->id]['users'])) {
+ $this->chatrooms[$chat->id]['users'] = array($sessionid => $user->id);
+ }
+ else {
+ // Otherwise just add the user
+ $this->chatrooms[$chat->id]['users'][$sessionid] = $user->id;
+ }
+
+ // $this->trace('QUIRKS value for this connection is '.$customdata['quirks']);
$this->dismiss_half($sessionid, false);
- chat_socket_write($this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL], $CHAT_HTMLHEAD_JS);
- $this->trace('Connection accepted: '.$this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL].', SID: '.$sessionid.' UID: '.$chatuser->userid.' GID: '.intval($groupid));
+ $this->write_data($this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL], $CHAT_HTMLHEAD_JS);
+ $this->trace('Connection accepted: '.$this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL].', SID: '.$sessionid.' UID: '.$chatuser->userid.' GID: '.intval($groupid), E_USER_WARNING);
// Finally, broadcast the "entered the chat" message
@socket_close($handle);
}
}
+ $chatroom = $this->sets_info[$sessionid]['chatid'];
+ $userid = $this->sets_info[$sessionid]['userid'];
unset($this->conn_sets[$sessionid]);
unset($this->sets_info[$sessionid]);
+ unset($this->chatrooms[$chatroom]['users'][$sessionid]);
return true;
}
if($ufo->handle == $handle) {
unset($this->conn_ufo[$id]);
if($disconnect) {
- chat_socket_write($handle, "You don't seem to be a valid client.\n");
+ $this->write_data($handle, "You don't seem to be a valid client.\n");
socket_shutdown($handle);
socket_close($handle);
}
return true;
}
+ $now = time();
+
+ // First of all, mark this chatroom as having had activity now
+ $this->chatrooms[$message->chatid]['lastactivity'] = $now;
+
foreach($this->sets_info as $sessionid => $info) {
// We need to get handles from users that are in the same chatroom, same group
if($info['chatid'] == $message->chatid &&
$this->trace('Delivering message "'.$output->text.'" to '.$this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL]);
if($output->beep) {
- chat_socket_write($this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL], '<embed src="'.$this->_beepsoundsrc.'" autostart="true" hidden="true" />');
+ $this->write_data($this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL], '<embed src="'.$this->_beepsoundsrc.'" autostart="true" hidden="true" />');
}
if($info['quirks'] & QUIRK_CHUNK_UPDATE) {
$output->html .= $GLOBALS['CHAT_DUMMY_DATA'];
}
- if(!chat_socket_write($this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL], $output->html)) {
-
- // Send failed! We must now disconnect/forget about the user FIRST
- // and THEN broadcast a message to all others... otherwise, infinite recursion.
-
- delete_records('chat_users', 'sid', $sessionid);
- $msg = &New stdClass;
- $msg->chatid = $info['chatid'];
- $msg->userid = $info['userid'];
- $msg->groupid = 0;
- $msg->system = 1;
- $msg->message = 'exit';
- $msg->timestamp = time();
-
- $this->trace('Client socket write failed, destroying uid '.$info['userid'].' with SID '.$sessionid);
- insert_record('chat_messages', $msg);
-
- // *************************** IMPORTANT
- //
- // Kill him BEFORE broadcasting, otherwise we 'll get infinite recursion!
- //
- // **********************************************************************
- $latesender = $this->sets_info[$sessionid]['user'];
- $this->dismiss_set($sessionid);
- $this->message_broadcast($msg, $latesender);
+ if(!$this->write_data($this->conn_sets[$sessionid][CHAT_CONNECTION_CHANNEL], $output->html)) {
+ $this->disconnect_session($sessionid);
}
//$this->trace('Sent to UID '.$this->sets_info[$sessionid]['userid'].': '.$message->text_);
}
}
}
- function message_commit() {
+ function disconnect_session($sessionid) {
+ $info = $this->sets_info[$sessionid];
+
+ delete_records('chat_users', 'sid', $sessionid);
+ $msg = &New stdClass;
+ $msg->chatid = $info['chatid'];
+ $msg->userid = $info['userid'];
+ $msg->groupid = 0;
+ $msg->system = 1;
+ $msg->message = 'exit';
+ $msg->timestamp = time();
+
+ $this->trace('User has disconnected, destroying uid '.$info['userid'].' with SID '.$sessionid, E_USER_WARNING);
+ insert_record('chat_messages', $msg);
+
+ // *************************** IMPORTANT
+ //
+ // Kill him BEFORE broadcasting, otherwise we 'll get infinite recursion!
+ //
+ // **********************************************************************
+ $latesender = $info['user'];
+ $this->dismiss_set($sessionid);
+ $this->message_broadcast($msg, $latesender);
}
function fatal($message) {
$message .= "\n";
if($this->_logfile) {
- $this->trace($message);
+ $this->trace($message, E_USER_ERROR);
}
echo "FATAL ERROR:: $message\n";
die();
}
+ function init_sockets() {
+ global $CFG;
+
+ $this->trace('Setting up sockets');
+
+ if(false === ($this->listen_socket = socket_create(AF_INET, SOCK_STREAM, 0))) {
+ // Failed to create socket
+ $lasterr = socket_last_error();
+ $this->fatal('Error: socket_create() failed: '. socket_strerror($lasterr).' ['.$lasterr.']');
+ }
+
+ //socket_close($DAEMON->listen_socket);
+ //die();
+
+ if(!socket_bind($this->listen_socket, $CFG->chat_serverip, $CFG->chat_serverport)) {
+ // Failed to bind socket
+ $lasterr = socket_last_error();
+ $this->fatal('Error: socket_bind() failed: '. socket_strerror($lasterr).' ['.$lasterr.']');
+ }
+
+ if(!socket_listen($this->listen_socket, $CFG->chat_servermax)) {
+ // Failed to get socket to listen
+ $lasterr = socket_last_error();
+ $this->fatal('Error: socket_listen() failed: '. socket_strerror($lasterr).' ['.$lasterr.']');
+ }
+
+ // Socket has been initialized and is ready
+ $this->trace('Socket opened on port '.$CFG->chat_serverport);
+
+ // [pj]: I really must have a good read on sockets. What exactly does this do?
+ // http://www.unixguide.net/network/socketfaq/4.5.shtml is still not enlightening enough for me.
+ socket_set_option($this->listen_socket, SOL_SOCKET, SO_REUSEADDR, 1);
+ socket_set_nonblock($this->listen_socket);
+ }
+
function cli_switch($switch, $param = NULL) {
switch($switch) { //LOL
+ case 'reset':
+ // Reset sockets
+ $this->_resetsocket = true;
+ return false;
case 'start':
// Start the daemon
$this->_readytogo = true;
die();
}
-/// Try to set up all the sockets ////////////////////////////////////////////////
-
-$DAEMON->trace('Setting up sockets');
-
if (!function_exists('socket_set_option')) {
// PHP < 4.3
if (!function_exists('socket_setopt')) {
}
}
-// Creating socket
-
-if(false === ($DAEMON->listen_socket = socket_create(AF_INET, SOCK_STREAM, 0))) {
- // Failed to create socket
- $DAEMON->last_error = socket_last_error();
- echo "Error: socket_create() failed: ". socket_strerror(socket_last_error($DAEMON->last_error)).' ['.$DAEMON->last_error."]\n";
- die();
-}
+$DAEMON->init_sockets();
-//socket_close($DAEMON->listen_socket);
-//die();
-
-if(!socket_bind($DAEMON->listen_socket, $CFG->chat_serverip, $CFG->chat_serverport)) {
- // Failed to bind socket
- $DAEMON->last_error = socket_last_error();
- echo "Error: socket_bind() failed: ". socket_strerror(socket_last_error($DAEMON->last_error)).' ['.$DAEMON->last_error."]\n";
-
- if($DAEMON->last_error != 98) {
- die();
- }
-
-}
-if(!socket_listen($DAEMON->listen_socket, $CFG->chat_servermax)) {
- // Failed to get socket to listen
- $DAEMON->last_error = socket_last_error();
- echo "Error: socket_listen() failed: ". socket_strerror(socket_last_error($DAEMON->last_error)).' ['.$DAEMON->last_error."]\n";
- die();
-}
-
-// Socket has been initialized and is ready
-$DAEMON->trace('Socket opened on port '.$CFG->chat_serverport);
-
-// [pj]: I really must have a good read on sockets. What exactly does this do?
-// http://www.unixguide.net/network/socketfaq/4.5.shtml is still not enlightening enough for me.
-socket_set_option($DAEMON->listen_socket, SOL_SOCKET, SO_REUSEADDR, 1);
-socket_set_nonblock($DAEMON->listen_socket);
-
-/// Sockets all set up! Now we loop and process incoming data.
/*
declare(ticks=1);
// setup signal handlers
pcntl_signal(SIGTERM, "sig_handler");
pcntl_signal(SIGHUP, "sig_handler");
-*/
if($DAEMON->_pcntl_exists && false) {
$DAEMON->trace('Unholy spirit possession: daemonizing');
// Cannot go demonic
$DAEMON->trace('Unholy spirit possession failed: PHP is not compiled with --enable-pcntl');
}
+*/
$DAEMON->trace('Started Moodle chatd on port '.$CFG->chat_serverport.', listening socket '.$DAEMON->listen_socket, E_USER_WARNING);
}
}
+ $now = time();
+
+ // Clean up chatrooms with no activity as required
+ if($now - $DAEMON->_last_idle_poll > $DAEMON->_freq_poll_idle_chat) {
+ $DAEMON->poll_idle_chats($now);
+ }
+
// Finally, accept new connections
$DAEMON->conn_accept();
@socket_shutdown($DAEMON->listen_socket, 0);
die("\n\n-- terminated --\n");
-
-function chat_socket_write($connection, $text) {
- $check_socket = array($connection);
- $socket_changed = socket_select($read = NULL, $check_socket, $except = NULL, 0, 0);
- if($socket_changed > 0) {
- $written = socket_write($connection, $text, strlen($text));
- return true;
- }
- return false;
-}
-
-
?>