手撸原生swoole完整版(http、websocket服务端、代码热更新等) | php 技术论坛-380玩彩网官网入口
在上一篇文章中,我们介绍了swoole的安装与基本使用,
在这篇文章中,完整的演示了swoole的使用,
http服务端、websocket服务端、代码热更新,
接收到请求时,转发到真正的逻辑文件进行执行,
在逻辑文件中,如何完成数据交互与执行,
在后期文件中,我们将结合redis队列实现商品的抢购
新建文件 swoole.php 及 feng/index.php
swoole.php
/**
 * @author: [feng] <[email protected]>
 * @date:   2024-08-20 13:26:14
 * @last modified by:   [feng] <[email protected]>
 * @last modified time: 2024-08-31 17:38:36
 */
use \swoole\server\helper;
use \swoole\websocket\frame;
use \swoole\websocket\server as websocketserver;
use \swoole\server;
use \swoole\timer;
use \swoole\process;
/**
 * swoole websocket server 命令行服务类
 * 此文件的修改不支持热更新,请于更新后重启swoole-websocket服务
 */
class websocket
{
    protected $monitor;
    protected $md5file;
    // swoole对象
    public $swoole;
    // socket的类型
    protected $socktype = swoole_sock_tcp;
    // 运行模式
    protected $mode = swoole_process;
    // swoolecommon 类实例
    protected $swoolecommon;
    /**
     * swoolecommon 类实例
     */
    protected $config = [
        "wss_switch"    => "0",
        // "ssl_cert_file" => "/www/wwwroot/yourdomain/cert/im.pem",
        // "ssl_key_file"  => "/www/wwwroot/yourdomain/cert/im.key",
        "websocket_port" => "9501",
        "worker_num"    => "2",
        "task_worker_num" => "2",
        "reactor_num"   => "2",
        "max_connections" => "20480",
        'daemonize' => false,
        'max_request' => 100000,
        'task_enable_coroutine' => true, // 关闭task协程
        'max_wait_time' => 10,
        'reload_async' => true, // 异步模式下启动管理进程
    ];
    /**
     * 支持的响应事件
     * @var array
     */
    protected $event = [
        'start',
        'shutdown',
        'workerstart',
        'workerstop',
        'workerexit',
        'receive',
        'close',
        'task',
        'finish',
        'pipemessage',
        'workererror',
        'managerstart',
        'managerstop',
        'open',
        'request',
        'message',
        'handshake',
    ];
    function __construct()
    {
        if ($this->config['wss_switch']) {
            if (file_exists($this->config['ssl_cert_file']) && file_exists($this->config['ssl_key_file'])) {
                $this->socktype = swoole_sock_tcp | swoole_ssl;
            } else {
                throw new \exception('ssl certificate file does not exist!');
            }
        }
        $this->swoole = new websocketserver('127.0.0.1', $this->config['websocket_port'], $this->mode, $this->socktype);
        // 设置文件监控(监控时间秒,监控目录)(正式环境勿开,很占cpu)
        $this->setmonitor(1000, ['./feng/']);
        // 初始化swoole配置
        $this->option($this->config);
        $agreement = $this->config['wss_switch'] ? 'wss://' : 'ws://';
        $address   = $agreement . "127.0.0.1:" . $this->config['websocket_port'];
        echo "swoole websocket server started: <" . $address . ">\n";
        echo "you can exit with `ctrl-c`\n";
        $this->swoole->start();
    }
    /**
     * worker 进程启动
     * @param  $server
     * @param  $worker_id
     */
    public function onworkerstart($server, $worker_id)
    {
        if (0 == $worker_id && $this->monitor) {
            // print_r(get_included_files());// 查看不支持热更新的文件列表
            $this->monitor($server);
        }
    }
    /**
     * 链接握手成功
     * @param  $server
     * @param  $frame
     */
    public function onopen($server, $frame)
    {
        $server->push($frame->fd, json_encode([
            'type' => 'init',
            'client_id' => $frame->fd,
        ]));
    }
    /**
     * request回调
     * 当请求过来时,会调用此方法,然后转发到真正的逻辑文件执行
     * @param $request
     * @param $response
     */
    public function onrequest($request, $response) {
        $data = $request->post ?? ($request->get ?? []);
        // array_walk_recursive($data, ['app\worker\logic\common', 'checkvariable']);
        $server = $this->swoole;//调用外部的server
        $client_id = $request->fd;
        if ($request->server['path_info'] == '/favicon.ico' || $request->server['request_uri'] == '/favicon.ico') {
            $response->status(404);
            $response->end();
            return;
        }
        $uri = array_filter(explode('/', ltrim($request->server['request_uri'], '/wss')));
        [$c, $a] = (count($uri) == 1) ? [$uri[0], ''] : ($uri ?: [$data['c'] ?? '', $data['a'] ?? '']);
        $response->header('content-type', 'application/json; charset=utf-8');
        // 检查要访问的类是否存在
        if (empty($data)) {
            $response->end(msg(0, '参数缺失,请填写完整参数'));
            return;
        }
        $filename = __dir__ . '/feng/' . ucfirst($c) . '.php';
        if (file_exists($filename) && !empty($c)) { // 判定文件是否存在
            if (is_readable($filename)) {
                require_once $filename;
            }
            $classname = "\\feng\\" . ucfirst($c);
            // 检查要访问的类是否存在
            if (!class_exists($classname, false)) {
                $response->end(msg(0, '访问的控制器不存在!'));
                return;
            }
        } else {
            $response->end(msg(0, '访问的控制器文件不存在!'));
            return;
        }
        $o = new $classname([$server, $response, $this->swoolecommon]); // 新建对象
        if (!empty($a) && !method_exists($o, $a)) {
            $response->end(msg(0, '访问的方法不存在!'));
            return;
        }
        if (isset($o) && $a) {
            $result = call_user_func_array([$o, $a], [$data]); //调用对象$o($c)里的方法$a
            if ($result) {
                $json = is_array($result) ? msg($result) : $result;
                $response->end($json);
            }
        } else {
            $json = msg(0, '参数缺失,请填写要执行的操作');
            $response->end($json);
        }
    }
    /**
     * 收到数据帧
     * 与request请求类似,转发到真正的逻辑文件执行
     * @param  $server
     * @param  $frame
     */
    public function onmessage($server, $frame)
    {
        $message = json_decode($frame->data, true) ?: $frame->data;
        // 安全检查过i滤
        // array_walk_recursive($message, ['app\worker\logic\common', 'checkvariable']);
        $data = $message['data'] ?? [];
        $client_id = $frame->fd;
        if (!is_array($message) || !isset($message['c']) || !isset($message['a'])) {
            $server->push($client_id, msg('show_msg', '参数缺失,请填写完整参数'));
            return;
        }
        $filename = __dir__ . '/feng/' . ucfirst($message['c']) . '.php';
        if (file_exists($filename)) { // 判定文件是否存在
            if (is_readable($filename)) {
                require_once $filename;
            }
            $classname = "\\feng\\" . ucfirst($message['c']);
            // 检查要访问的类是否存在
            if (!class_exists($classname, false)) {
                $server->push($client_id, msg('show_msg', '访问的控制器不存在!'));
                $server->close($client_id);
                return;
            }
        } else {
            $server->push($client_id, msg('show_msg', '错误的请求2!'));
            $server->close($client_id);
            return;
        }
        $o = new $classname([$server, $frame, $this->swoolecommon]); // 实例化类
        $a = $message['a']; // 方法名称
        if (!method_exists($o, $message['a'])) {
            $server->push($client_id, msg('show_msg', '访问的方法不存在!'));
            return;
        }
        $result = call_user_func_array([$o, $a], [$data]); //调用对象$o($c)里的方法$a
        $json = is_array($result) ? msg($result) : $result;
        $server->push($client_id, $json); // 发送消息
    }
    /**
     * 链接关闭
     */
    public function onclose($server, $fd, $reactorid)
    {
        // 解除所有绑定关系
        // $this->swoolecommon->unbindfd($fd);
    }
    /**
     * @param $serv
     * @param $taskid
     * @param $workerid
     * @param $data
     */
    public function ontask($server, $taskid, $workerid, $data)
    {
        // 分发 task 任务机制,让不同的任务 走不同的逻辑
        // $obj = new app\common\lib\task\task;
        // $method = $data['method'];
        // timer::tick(5000, function() use ($taskid){
        //     echo "taskid  " . $taskid . "  timeout " . rand(1111,9999) . "\n";
        //     $flag = rand(1111,9999);
        //     return $flag; // 告诉worker
        // });
        $flag = rand(1111,9999);
        return $flag; // 告诉worker
    }
    /**
     * @param $server
     * @param $taskid
     * @param $data
     */
    public function onfinish($server, $taskid, $data) {
        echo "taskid:{$taskid}\n";
        echo "finish-data-sucess:{$data}\n";
    }
    public function option(array $option)
    {
        if (!empty($option)) {
            $this->swoole->set($this->checkoptions($option));
        }
        // 注册回调
        foreach ($this->event as $event) {
            if (method_exists($this, 'on' . $event)) {
                $this->swoole->on($event, [$this, 'on' . $event]);
            }
        }
        // 实例化swoolecommon类
        // $this->swoolecommon = new \app\worker\library\common($option['max_connections'], $this->swoole);
    }
    protected function checkoptions(array $options)
    {
        if (class_exists(helper::class)) {
            $constoptions = helper::global_options  helper::server_options  helper::port_options  helper::helper_options;
            foreach ($options as $k => $v) {
                if (!array_key_exists(strtolower($k), $constoptions)) {
                    unset($options[$k]);
                }
            }
        }
        return $options;
    }
    public function setmonitor($interval = 2, $path = [])
    {
        $this->monitor['interval'] = $interval;
        $this->monitor['path']     = (array)$path;
    }
    /**
     * 文件监控
     * @param $server
     */
    public function monitor($server)
    {
        if ($this->monitor['path']) {
            $serverid = $server->tick($this->monitor['interval'], function ($serverid) use ($server) {
                $md5arr=[];
                foreach ($this->monitor['path'] as $path) {
                    $files = glob(rtrim($path, '/') . "/*.php");
                    foreach ($files as $file){
                        $md5arr[] = md5_file($file);
                    }
                }
                $md5value = md5(implode('',$md5arr));
                if ($this->md5file==''){
                    $this->md5file = $md5value;
                    return;
                }
                //文件有改动
                if (strcmp($this->md5file, $md5value)!==0){
                    $this->md5file = $md5value;
                    echo "[update] the service folder has changed and is currently reloading...\n";
                    $server->cleartimer($serverid);
                    process::kill($server->master_pid, sigusr1); // 重启服务
                    return;
                }
            });
            // // 创建一个inotify实例(linux下安装inotify服务,pecl install inotify)
            // $fd = inotify_init();
            // // 添加需要监控的事件
            // $mask = in_delete | in_create | in_moved_from | in_moved_to | in_close_write;
            // foreach ($this->monitor['path'] as $path) {
            //     inotify_add_watch($fd, $path, $mask); // 添加监控目录
            // }
            // // 循环读取inotify事件
            // $serverid = $server->tick($this->monitor['interval'], function ($serverid) use ($server, $fd) {
            //     $events = inotify_read($fd);
            //     if ($events) {
            //         foreach ($events as $event) {
            //             // 检测到文件更改,重启服务
            //             echo "[update] the service folder has changed and is currently reloading...\n";
            //             $server->cleartimer($serverid);
            //             process::kill($server->master_pid, sigusr1); // 重启服务
            //             return;
            //         }
            //     }
            // });
        }
    }
    /**
     * 魔术方法 有不存在的操作的时候执行
     * @access public
     * @param string $method 方法名
     * @param array $args 参数
     * @return mixed
     */
    public function __call($method, $args)
    {
        call_user_func_array([$this->swoole, $method], $args);
    }
}
new websocket();
/**
 * [result 返回状态数组]
 * @param  [type] $code [错误码]
 * @param  string $data [具体数据]
 * @return [type]       [description]
 */
function msg($code, $msg=false, $data=false)
{
    $code = is_string($code) ? trim($code) : $code;
    if (is_string($code) && preg_match('/^[a-za-z_-] $/', $code) === 1) {
        $result = ['type'=>$code, 'data'=>$msg];
    } else {
        if (is_numeric($code)) {
            $result = ['code'=>$code, 'msg'=>$msg, 'data'=>$data, 'time'=>time()];
        } else {
            $msg || $msg = '操作成功';
            $data = $data ?: $code;
            $result = ['code'=>1, 'msg'=>$msg, 'data'=>$code, 'time'=>time()];
        }
    }
    return json_encode($result, json_unescaped_unicode);
}
feng/index.php
/**
 * @author: [feng] <[email protected]>
 * @date:   2024-08-28 13:26:14
 * @last modified by:   [feng] <[email protected]>
 * @last modified time: 2024-08-31 17:49:07
 */
namespace feng;
class index
{
    public $userinfo = [];
    protected $server;
    protected $reids;
    protected $frame;
    protected $request;
    protected $swoolecommon;
    public function __construct($base = [])
    {
        if ($base) {
            [$this->server, $this->frame, $this->swoolecommon] = $base;
        }
        // $this->redis = (new \redis())->init($config); //实例化redis
    }
    public function ceshi($message=[])
    {
        $data = [
            'content'=>'这是一段消息',
            'date' => date('y-m-d h:i:s'),
        ];
        return array_merge($data, $message);
    }
    public function index($value='')
    {
        return 'ceshi消息';
    }
}开启swoole服务
(windows下使用 swoole swoole.php 开启服务)
php swoole.php访问http服务接口 http://127.0.0.1:9501/index/ceshi?id=1
本作品采用《cc 协议》,转载必须注明作者和本文链接
 
 
推荐文章: