php cli编写一个高并发的http服务

php cli编写一个高并发的http服务

php cli编写一个高并发的http服务

大家都知道php的http请求处理脚本是运行在apache或php-fpm中,那么除此之外还有其他吗?有,其实php cli命令行中也是可以响应http请求并处理的,最简单的参照对象就是workman、swoole,php可以监听tcp或udp的端口,实现http协议,今天我们来试一试

<?php
$socket = stream_socket_server("0.0.0.0:8889", $errno, $errstr);
stream_set_blocking($socket, 0);//设置是否阻塞

if (!$socket) {
    echo "$errstr ($errno)<br />\n";
} else {
    while (true) {
        $conn = @stream_socket_accept($socket);//接受客户端的请求

        if ($conn)
        {
            fwrite($conn, '当前时间是 ' . date('n/j/Y g:i a') . "\n");//响应客户端请求
            fclose($conn);

        } else {
            echo "no newSocket\n";
        }
    }
}
?>

上面面是个简单的http监听服务,具体的post、session等处理还需有更进一步的优化,好了,有的人肯定好奇,就这么几句话就实现高并发了?

没那么简单,高并发还需要多个进程来进行处理,php-fpm也不止一个呀,好了,我们来写一个Worker,实现进程自我创建和管理,提高php的并发处理能力,就想餐馆,一个服务员不够就多个嘛

<?php

class Worker
{
    /**
     * version
     * @var string
     */
    const VERSION = '0.0.1';

    /**
     * Emitted when data is received
     * @var callable
     */
    public $onMessage = NULL;

    /**
     * 子进程的数量
     * @var int
     */
    public $count = 1;

    /**
     * Listening socket
     * @var resource
     */
    public $socket = NULL;

    /**
     * Socket name The format is like this 0.0.0.0:8000
     * @var string
     */
    protected $_socketName = '';

    /**
     * 连接socket
     * @var resource
     */
    protected $_connSocket = NULL;

    /**
     * 子进程 pid 数组
     * @var array
     */
    private $forkList = [];

    /**
     * 主进程 ID 文件
     * @var string
     */
    private $masterPidFile = 'master.pid';

    /**
     * 主进程是否停止
     * @var int
     */
    private $masterStop = 0;

    public function __construct($socketName, $options = [])
    {
        $this->_socketName = $socketName;
    }

    public function init()
    {
        $this->parseCommand();
    }

    public function start()
    {
        // 判断当前程序是否已经启动
        $masterPidFileExist = is_file($this->masterPidFile);
        if ($masterPidFileExist) {
            exit("当前程序已经在运行,请不要重启启动
");
        }

        // 保存主进程pid到文件用于stop,reload,status等命令操作
        $masterPid = posix_getpid();
        file_put_contents($this->masterPidFile, $masterPid);


        pcntl_signal(SIGINT, array($this, 'signalHandler'), false);
        pcntl_signal(SIGTERM, array($this, 'signalHandler'), false);
        pcntl_signal(SIGUSR1, array($this, 'signalHandler'), false);

        $this->listen();

        // 创建count个子进程,用于接受请求和处理数据
        while(count($this->forkList) < $this->count) {
            $this->forkWokrer();
        }

        while (true)
        {
            //sleep(1);
            pcntl_signal_dispatch(); // 信号分发
            $status = 0;
            $pid = pcntl_wait($status, WUNTRACED); // 堵塞直至获取子进程退出或中断信号或调用一个信号处理器,或者没有子进程时返回错误
            pcntl_signal_dispatch();


            if ($pid > 0) {
                // 子进程退出
                echo "子进程退出pid:{$pid}\n";
                unset($this->forkList[$pid]);
                // 关闭还是重启
                if (!$this->masterStop) {
                    // 重启
                    $this->forkWokrer();
                }
            }
        }
    }

    public function listen()
    {
        // 主进程创建tcp服务器
        $errno = 0;
        $errmsg = '';
        $socket = stream_socket_server($this->_socketName, $errno, $errmsg);

        // 尝试打开KeepAlive TCP和禁用Nagle算法。
        if (function_exists('socket_import_stream'))
        {
            $socketImport = socket_import_stream($socket);
            @socket_set_option($socketImport, SOL_SOCKET, SO_KEEPALIVE, 1);
            @socket_set_option($socketImport, SOL_TCP, TCP_NODELAY, 1);
        }
        // Non blocking.
        stream_set_blocking($socket, 0);

        $this->socket = $socket;
    }

    public function accept()
    {
        $base = event_base_new();
        $event = event_new();

        event_set($event, $this->socket, EV_READ | EV_PERSIST, [$this, 'acceptConnect'], [$event, $base]);
        event_base_set($event, $base);
        event_add($event);
        event_base_loop($base);
    }


    public function acceptConnect($socket, $events, $arg)
    {
        $newSocket = @stream_socket_accept($socket, 0);

        if (!$newSocket)
        {
            return;
        }

        echo "acceptConnect\n";

        stream_set_blocking($newSocket, 0);

        // 子进程添加一个事件在newSocket文件描述符上
        $event = event_new();
        // 设置event监听事件,监听newSocket文件描述符,事件为EV_READ:可读,EV_PERSIST:持续化(断开连接可被监听到)
        event_set($event, $newSocket, EV_READ | EV_PERSIST, array($this, "handleData"), array($event, $arg[1]));
        event_base_set($event, $arg[1]);
        event_add($event);
    }

    public function handleData($newSocket, $events, $arg)
    {
        $this->_connSocket = $newSocket;
        $buffer = @fread($newSocket,65535);

        if ($buffer === '' || $buffer === false) {
            if (feof($newSocket) || !is_resource($newSocket) || $buffer === false) {
                echo "客户端关闭
";
                event_del($arg[0]); //关闭连接事件
                @fclose($this->_connSocket); // 关闭连接
                return;
            }
        }
        
        call_user_func($this->onMessage, $this); // 调用处理函数
    }


    public function send($data)
    {
       
        fwrite($this->_connSocket, $data, 8192);
        return true;
    }

    public function forkWokrer()
    {
        $pid = pcntl_fork();
        if ($pid == -1) {
            die('子进程创建失败');
        } else if ($pid == 0) {
            //处理逻辑
            $this->accept();
        } else {
            $this->forkList[$pid] = $pid;
        }
    }

    /**
     * 主进程处理信号
     * @param $sigNo
     */
    public function signalHandler($sigNo)
    {
        switch ($sigNo) {
            // Stop.
            case SIGTERM:
            case SIGINT:
                // 退出,先发送子进程信号关闭子进程,再等待主进程退出
                foreach ($this->forkList as $pid) {
                    echo "关闭子进程pid:{$pid}\n" ;
                    posix_kill($pid, SIGKILL);
                }
                unlink($this->masterPidFile);
                $this->masterStop = 1;
                exit(250);
                break;

            case SIGUSR1:
                // 重启,关闭当前存在但子进程,主进程会监视退出的子进程并重启一个新子进程
                foreach ($this->forkList as $pid) {
                    echo "关闭子进程pid:{$pid}\n" ;
                    posix_kill($pid, SIGKILL);
                }
                break;

            default:
                // 处理所有其他信号
        }
    }

    /**
     * 发送命令给主进程
     * @param $command
     */
    public function sendSignalToMaster($command)
    {
        $masterPid = file_get_contents($this->masterPidFile);

        if ($masterPid) {
            switch ($command) {
                case 'stop':
                    posix_kill($masterPid, SIGINT);
                    break;
                case 'reload':
                    posix_kill($masterPid, SIGUSR1);
                    break;
            }
            exit;
        } else {
            echo "主进程不存在\n";
            exit;
        }
    }

    protected function parseCommand()
    {
        global $argv;

        $available_commands = array(
            'start',
            'stop',
            'restart',
            'reload',
            'status',
            'connections',
        );
        $command  = isset($argv[1]) ? trim($argv[1]) : '';
        $usage = "Usage: php index.php {" . implode('|', $available_commands) . "}\n";
        if (empty($command) || !in_array($command, $available_commands)) {
            exit($usage);
        }

        switch ($command) {
            case 'start':
                $this->start();
                break;
            case 'stop':
            case 'reload':
            case 'status':
                $this->sendSignalToMaster($command);
                break;
        }
    }
}
?>

ok,我们来调用一下

<?php
require_once 'Woker.php';

$woker = new Worker('0.0.0.0:8889');
$woker->count = 4;?/创建4个子进程处理
$woker->onMessage = function (Worker $connection) {
    $connection->send("Hi world\n");
};
$woker->init();
?>

这只是一个简单的例子,除了tcp,还可以创建udp、websocket等,非常强大,另外在使用phpcli编程时,一定要注意内存的泄漏问题


{{collectdata}}

网友评论0