Skip to content

Commit f0a1254

Browse files
committed
broadcast socket server
1 parent 671c11e commit f0a1254

File tree

5 files changed

+201
-27
lines changed

5 files changed

+201
-27
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
Muilti Threaded Socket Server based on pcntl_fork()
22
====================
3+
Examaple created for http://systemsarchitect.net/multi-threaded-socket-server-in-php-with-fork/
4+
35

46
Requirements
57
---------------------

server-broadcast.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
/**
4+
* Check dependencies
5+
*/
6+
if( ! extension_loaded('sockets' ) ) {
7+
echo "This example requires sockets extension (http://www.php.net/manual/en/sockets.installation.php)\n";
8+
exit(-1);
9+
}
10+
11+
if( ! extension_loaded('pcntl' ) ) {
12+
echo "This example requires PCNTL extension (http://www.php.net/manual/en/pcntl.installation.php)\n";
13+
exit(-1);
14+
}
15+
16+
/**
17+
* Connection handler
18+
*/
19+
function onConnect( $client ) {
20+
$pid = pcntl_fork();
21+
22+
if ($pid == -1) {
23+
die('could not fork');
24+
} else if ($pid) {
25+
return $pid;
26+
}
27+
28+
printf( "[%s] Connected at port %d\n", $client->getAddress(), $client->getPort() );
29+
30+
\Sock\SocketServerBroadcast::broadcast( array( 'data' => "Connected\n", 'type' => 'msg' ) );
31+
32+
$read = '';
33+
while( true ) {
34+
35+
$read = $client->read();
36+
if( $read == '' ) {
37+
break;
38+
}
39+
40+
\Sock\SocketServerBroadcast::broadcast( array( 'data' => $read, 'type' => 'msg' ) );
41+
}
42+
43+
\Sock\SocketServerBroadcast::broadcast( array( 'type' => 'disc' ) );
44+
45+
printf( "[%s] Disconnected\n", $client->getAddress() );
46+
$client->close();
47+
}
48+
49+
require "sock/SocketServerBroadcast.php";
50+
51+
$server = new \Sock\SocketServerBroadcast();
52+
$server->init();
53+
$server->setConnectionHandler( 'onConnect' );
54+
$server->listen();

server.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ function onConnect( $client ) {
3434
if( $read != '' ) {
3535
$client->send( '[' . date( DATE_RFC822 ) . '] ' . $read );
3636
}
37+
else {
38+
break;
39+
}
3740

3841
if( preg_replace( '/[^a-z]/', '', $read ) == 'exit' ) {
3942
break;
@@ -46,9 +49,9 @@ function onConnect( $client ) {
4649
printf( "[%s] recieved: %s", $client->getAddress(), $read );
4750
}
4851
}
49-
50-
printf( "[%s] Disconnected\n", $client->getAddress() );
5152
$client->close();
53+
printf( "[%s] Disconnected\n", $client->getAddress() );
54+
5255
}
5356

5457
require "sock/SocketServer.php";

sock/SocketServer.php

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77

88
class SocketServer {
99

10-
private $sockServer;
11-
private $address;
12-
private $port;
13-
private $_listenLoop;
14-
private $connectionHandler;
10+
protected $sockServer;
11+
protected $address;
12+
protected $port;
13+
protected $_listenLoop;
14+
protected $connectionHandler;
1515

1616
public function __construct( $port = 4444, $address = '127.0.0.1' ) {
1717
$this->address = $address;
@@ -54,31 +54,38 @@ public function listen() {
5454
socket_strerror(socket_last_error( $this->sockServer ) ) );
5555
}
5656

57-
printf( "Listening on %s:%d...\n", $this->address, $this->port );
58-
5957
$this->_listenLoop = true;
58+
$this->beforeServerLoop();
59+
$this->serverLoop();
6060

61+
socket_close( $this->sockServer );
62+
}
63+
64+
protected function beforeServerLoop() {
65+
printf( "Listening on %s:%d...\n", $this->address, $this->port );
66+
}
67+
68+
protected function serverLoop() {
6169
while( $this->_listenLoop ) {
62-
if( ( $client = socket_accept( $this->sockServer ) ) === false ) {
63-
throw new SocketException(
64-
SocketException::CANT_ACCEPT,
65-
socket_strerror(socket_last_error( $this->sockServer ) ) );
66-
}
67-
68-
$socketClient = new SocketClient( $client );
69-
70-
if( is_array( $this->connectionHandler ) ) {
71-
$object = $this->connectionHandler[0];
72-
$method = $this->connectionHandler[1];
73-
$object->$method( $socketClient );
74-
}
75-
else {
76-
$function = $this->connectionHandler;
77-
$function( $socketClient );
70+
if( ( $client = @socket_accept( $this->sockServer ) ) === false ) {
71+
throw new SocketException(
72+
SocketException::CANT_ACCEPT,
73+
socket_strerror(socket_last_error( $this->sockServer ) ) );
74+
continue;
75+
}
76+
77+
$socketClient = new SocketClient( $client );
78+
79+
if( is_array( $this->connectionHandler ) ) {
80+
$object = $this->connectionHandler[0];
81+
$method = $this->connectionHandler[1];
82+
$object->$method( $socketClient );
83+
}
84+
else {
85+
$function = $this->connectionHandler;
86+
$function( $socketClient );
7887
}
7988
}
80-
81-
socket_close( $this->sockServer );
8289
}
8390

8491
}

sock/SocketServerBroadcast.php

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
<?php
2+
3+
namespace Sock;
4+
5+
require_once "SocketServer.php";
6+
7+
class SocketServerBroadcast extends SocketServer {
8+
9+
const PIPENAME = '/tmp/broadcastserver.pid';
10+
11+
private static $pid;
12+
13+
protected $pipe;
14+
15+
private $connections = array();
16+
17+
public function __construct( $port = 4444, $address = '127.0.0.1' ) {
18+
parent::__construct( $port, $address );
19+
self::$pid = posix_getpid();
20+
if(!file_exists(self::PIPENAME)) {
21+
umask(0);
22+
if( ! posix_mkfifo(self::PIPENAME, 0666 ) ) {
23+
die('Cant create a pipe: ' . self::PIPENAME);
24+
}
25+
}
26+
27+
$this->pipe = fopen(self::PIPENAME, 'r+');
28+
}
29+
30+
public function handleProcess() {
31+
$len = $this->bytesToInt( fread($this->pipe, 4) );
32+
$message = unserialize( fread( $this->pipe, $len ) );
33+
if( $message['type'] == 'msg' ) {
34+
$client = $this->connections[ $message['pid'] ];
35+
$msg = sprintf('[%s] (%d):%s', $client->getAddress(), $message['pid'], $message['data'] );
36+
printf( "Broadcast: %s", $msg );
37+
foreach( $this->connections as $pid => $conn ) {
38+
if( $pid == $message['pid'] ) {
39+
continue;
40+
}
41+
42+
$conn->send( $msg );
43+
}
44+
}
45+
else if( $message['type'] == 'disc' ) {
46+
unset( $this->connections[ $message['pid'] ] );
47+
}
48+
}
49+
50+
public function bytesToInt($char) {
51+
$num = ord($char[0]);
52+
$num += ord($char[1]) << 8;
53+
$num += ord($char[2]) << 16;
54+
$num += ord($char[3]) << 24;
55+
return $num;
56+
}
57+
58+
protected function beforeServerLoop() {
59+
parent::beforeServerLoop();
60+
socket_set_nonblock( $this->sockServer );
61+
pcntl_signal(SIGUSR1, array($this, 'handleProcess'), true);
62+
}
63+
64+
protected function serverLoop() {
65+
while( $this->_listenLoop ) {
66+
if( ( $client = @socket_accept( $this->sockServer ) ) === false ) {
67+
$info = array();
68+
if( pcntl_sigtimedwait(array(SIGUSR1),$info,1) > 0 ) {
69+
$this->handleProcess();
70+
}
71+
continue;
72+
}
73+
74+
$socketClient = new SocketClient( $client );
75+
76+
if( is_array( $this->connectionHandler ) ) {
77+
$object = $this->connectionHandler[0];
78+
$method = $this->connectionHandler[1];
79+
$childPid = $object->$method( $socketClient );
80+
}
81+
else {
82+
$function = $this->connectionHandler;
83+
$childPid = $function( $socketClient );
84+
}
85+
86+
$this->connections[ $childPid ] = $socketClient;
87+
}
88+
unlink(self::PIPENAME);
89+
}
90+
91+
static function broadcast( Array $msg ) {
92+
$msg['pid'] = posix_getpid();
93+
$message = serialize( $msg );
94+
$f = fopen(self::PIPENAME, 'w');
95+
fwrite($f, self::strlenInBytes($message) . $message);
96+
fclose($f);
97+
posix_kill(self::$pid, SIGUSR1);
98+
}
99+
100+
static function strlenInBytes($str) {
101+
$len = strlen($str);
102+
$chars = chr( $len & 0xFF );
103+
$chars .= chr( ($len >> 8 ) & 0xFF );
104+
$chars .= chr( ($len >> 16 ) & 0xFF );
105+
$chars .= chr( ($len >> 24 ) & 0xFF );
106+
return $chars;
107+
}
108+
}

0 commit comments

Comments
 (0)