PHP-MSF

示例项目

使用msf构建业务应用的推荐方式,主要目录结构为

.
├── README.md
├── app // PHP业务代码
│   ├── AppServer.php // 应用server类,可根据需求自定义
│   ├── Controllers // 控制器类目录
│   ├── Lib // 特殊逻辑处理类目录
│   ├── Models // Model类目录
│   ├── Route // 特殊路由规则****目录
│   ├── Tasks // Task类目录
│   └── Views // 视图文件目录
├── build.sh // 构建脚本(拉取docker镜像,启动容器)
├── checkstyle.sh // 代码检查脚本
├── composer.json // composer包依赖配置文件
├── config // 配置目录
│   ├── check.php // 代码检查配置
│   ├── server.php // 主配置文件(server服务相关)
│   ├── constant.php // 业务常量定义文件
│   ├── log.php // 全局日志配置
│   ├── http.php // HTTP服务配置
│   ├── params.php // 全局业务配置(和运行环境无关)
│   ├── dev // 研发联调环境特殊配置目录
│   ├── docker // docker环境特殊配置目录
│   ├── product // 生产环境特殊配置目录
│   ├── qa // QA环境特殊配置目录
├── server.php // server启动脚本
├── console.php // 命令行脚本
├── test // 单元测试目录
└── www  // server根目录
    └── index.html // 索引文件
master分支(框架源代码)

框架源码,支持composer安装,具有composer包管理的所有特性,仅为框架代码,不包含任何业务逻辑,主要目录结构为

.
├── Base // 核心
│   ├── AOPFactory.php //切片类工厂
│   ├── Child.php // 基类
│   ├── Core.php // 内核基类
│   ├── Input.php // 用户输入对象
│   └── Output.php // 用户输出对象
│   └── Pool.php // 通用对象池类
├── Client // 客户端
│   ├── ConcurrentClient.php // flex风格的并行请求
│   ├── Http // HTTP客户端
│   │   └── Client.php // HTTP客户端实现
│   └── RpcClient.php // RPC Client
├── Console // 命令行
│   ├── Controller.php // 命令行控制器基类
│   └── Request.php // 命令行请求对象
├── Controllers // Web控制器
│   ├── Controller.php // web请求控制器基类
│   ├── Monitor.php // 内存服务器运行状态控制器
│   ├── Rest.php  // Restful风格的API控制器示例
│   └── Rpc.php  // 处理RPC请求控制器
├── Coroutine // 协程
│   ├── Base.php // IO协程基类
│   ├── CException.php // 协程异常
│   ├── CNull.php // 协程NULL值
│   ├── CTask.php // Task协程
│   ├── Dns.php // DNS协程
│   ├── Http.php // HTTP请求协程
│   ├── IBase.php // IO协程接口类
│   ├── MySql.php // MySQL协程
│   ├── Redis.php // Redis协程
│   ├── Scheduler.php // 协程调度器
│   └── Task.php // 协程类
├── Helpers // 辅助工具
│   ├── Common.php // 全局函数
│   └── Context.php // 请求上下文
├── HttpServer.php // Http Sever类
├── Marco.php // 全局类常量
├── Models // 模型
│   └── Model.php // 模型基类
├── MSFCli.php // Cli命令行
├── MSFServer.php // MSFServer服务类
├── Pack // 打包
│   ├── IPack.php // 打包基类
│   ├── JsonPack.php // Json
│   ├── MsgPack.php // MsgPack
│   └── SerializePack.php // Serialize
├── Pools // 连接池
│   ├── AsynPoolManager.php // 连接池管理器
│   ├── AsynPool.php // 连接池
│   ├── CoroutineRedisProxy.php // Redis连接池的协程辅助类
│   ├── IAsynPool.php // 连接池接口类
│   ├── Miner.php // MySQL Query Builder
│   ├── MysqlAsynPool.php // 异步MySQL连接池
│   └── RedisAsynPool.php // 异步Redis连接池
├── Process // 用户自定义进程
│   ├── Config.php // 配置管理进程
│   ├── Inotify.php // 文件监控进程
│   ├── ProcessBase.php // 自定义进程基类
│   └── Timer.php // 用户自定义定时器进程
├── Proxy // 代理
│   ├── IProxy.php // 代理接口
│   ├── RedisProxyCluster.php // Redis分布式代理类
│   ├── RedisProxyFactory.php // Redis代理工厂类
│   └── RedisProxyMasterSlave.php // Redis主从代理类
├── Rest // Restful
│   ├── Controller.php // Restful风格的API控制器基类
│   └── Serializer.php // Restful相关分页、序列化类
├── Route // 路由
│   ├── IRoute.php // 路由接口类
│   ├── NormalRoute.php // 通用路由类
│   └── RestRoute.php // Rest路由类
├── Server.php // Server基类
├── Tasks // Task
│   ├── MongoDbTask.php // MongoDB Task
│   ├── Task.php // Task基类
│   └── TaskProxy.php // Task Proxy
└── Views // 视图
    ├── error_404.php 
    └── test.html

RESTful

MSF 原生支持RESTful风格api,提供GET/POST/PUT/PATCH/HEAD/OPTIONS/DELETE动作的支持。

verb 介绍
'GET',      // 从服务器取出资源(一项或多项)
'POST',     // 在服务器新建一个资源
'PUT',      // 在服务器更新资源(客户端提供改变后的完整资源)
'PATCH',    // 在服务器更新资源(客户端提供改变的属性)
'DELETE',   // 从服务器删除资源
'HEAD',     // 获取 head 元数据
'OPTIONS',  // 获取信息,关于资源的哪些属性是客户端可以改变的
MSF实现RESTful程序
Rest |- Controller.php 控制器
     |- Route.php 路由器
使用方式
  1. 在配置文件中配置路由器为: $config['server']['route_tool'] = '\\PG\\MSF\\Route\\RestRoute'
  2. 配置URL路由规则
  3. 控制器继承 PG\MSF\Rest\Controller
推荐控制器接收不同动作映射
'PUT,PATCH {id}' => 'update', // 更新资源,如:/users/<id>
'DELETE {id}' => 'delete',    // 删除资源,如:/users/<id>
'GET,HEAD {id}' => 'view',    // 查看资源单条数据,如:/users/<id>
'POST' => 'create',           // 新建资源,如:/users
'GET,HEAD' => 'index',        // 查看资源列表数据(可分页),如:/users
'{id}' => 'options',          // 查看资源所支持的HTTP动词,如:/users/<id> | /users
'' => 'options',
URL路由配置

通过请求url中的path和动作类型即可路由到对应控制器下的某个方法(method)。URL路由配置支持正则方式,在url的path中可携带参数。例如:

$config['rest']['route']['rules'] = [
    'GET,POST /groups' => '/account/profile',
    'GET /users/ask' => 'user/apply',
    'GET /users' => 'user/index',
    'GET /users/<uid:\d+>' => 'user/view',
    'PUT /users/<method:\w+>' => 'user/<method>',
    'DELETE /users/<uid:\d+>' => 'user/delete',
]

MSF协程调度器

MSF协程基于Generator/Yield,IO事件触发,是自主研发的调度器,它的核心思想是发生任何异步IO操作之后,程序的控制流就切换到其他请求,待异步IO可读之后,由程序自行调用调度器接口,进行一次调度,并响应请求。MSF协程完全抛弃了定时器每隔一定时间轮询任务的方案,使得调度器的性能更加接近原生的异步回调方式。

协程执行流程图V2

同步任务

同步任务即Task,用来做一些异步的慢速任务,比如发邮件、批量任何、任何不支持异步Client的服务调用等等。MSF框架是纯异步Server架构,也就是说任何耗时的任务的代码不能放到Worker进程中执行,所以我们利用Swoole的Task模块来完成耗时任务

Task进程

同步任务代码是在Task进程中执行的,Task进程具有以下的特性:

  • 同步阻塞
  • 支持定时器

目前MSF框架封装了异步的Http Client、Redis Client、MySQL Client,除了这几种原生支持异步外,任何其他的非异步Client的网络IO均要封装成Task,比如MongoDB Task

其他注点意

$response>end()函数

swoole_http_server中,当调用$response->end()之后的代码是继续执行的,但不能多次调用

Task是否可使用异步函数以及协程调度?
  1. 不能使用异步函数。Task是同步任务,任务非异步的Client的网络IO都要封装成Task使用。
  2. Task是没有调度器的,task_worker进程只进程onTask函数的处理。
    if (!$serv->taskworker) {//worker进程               
     $this->scheduler = new Scheduler();
     self::setProcessTitle($this->config['server.process_title'] . '-Worker');
    } else {
     self::setProcessTitle($this->config['server.process_title'] . '-Tasker');
    }
    
销毁对象机制
  • 每创建对象都要调用__supportAutoDestroy函数,用以生成类属性默认值待请求结束后重新初始化,以下属性不做重新初始化,永久使用直到被销毁
    • static修饰的成员属性
    • public修饰的__useCount__genTime__coreName__DSLevel__reflections属性
自定义类如何使用框架指定函数创建对象?

核心文件说明

Flexihash\Flexihash

redis组件使用一致性算法的类库

php-aop/src/Wrapper
  • AOP 包装器
  • AOP包装库提供主要切面功能,注入前置及后置方法
  • AOP适配器兼容协程适配对象
php-msf/src/Server.php
/**
* 解析命令行参数
*
* @return void
*/
protected static function parseCommand()
{
    global $argv;
    // Check argv;
    $startFile = $argv[0];
    if (!isset($argv[1])) {
        $argv[1] = 'start';
    }

    // Get command.
    $command = trim($argv[1]);
    $command2 = isset($argv[2]) ? $argv[2] : '';

    // Start command.
    $mode = '';
    if (file_exists(self::$pidFile)) {
        $pids = explode(',', file_get_contents(self::$pidFile));
        // Get master process PID.
        $masterPid = $pids[0];
        $managerPid = $pids[1];
        /**
        * SIG_BLOCK
        * 按照参数 set 提供的屏蔽字,屏蔽信号。
        * 并将原信号屏蔽保存到oldset中。
        */
        $masterIsAlive = $masterPid && @posix_kill($masterPid, SIG_BLOCK);
    } else {
        $masterIsAlive = false;
    }
    // Master is still alive?
    if ($masterIsAlive) {
        if ($command === 'start' || $command === 'test') {
            writeln("{$startFile} already running");
            exit;
        }
    } elseif ($command !== 'start' && $command !== 'test') {
        writeln("{$startFile} not run");
        exit;
    }

    // execute command.
    switch ($command) {
        case 'start':
            if ($command2 === '-d') {
                self::$daemonize = true;
            }
            break;
        case 'stop':
            @unlink(self::$pidFile);
            writeln("$startFile is stoping ...");
            /**
            * Send stop signal to master process.
            * SIGTERM 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和处理.
            * 通常用来要求程序自己正常退出. shell命令kill缺省产生这个信号.
            */
            $masterPid && posix_kill($masterPid, SIGTERM);
            // Timeout.
            $timeout = 5;
            $startTime = time();
            // Check master process is still alive?
            while (1) {
                $masterIsAlive = $masterPid && posix_kill($masterPid, SIG_BLOCK);
                if ($masterIsAlive) {
                    // Timeout?
                    if (time() - $startTime >= $timeout) {
                        writeln("{$startFile} stop fail");
                        exit;
                    }
                    // Waiting amoment.
                    usleep(10000);
                    continue;
                }
                // Stop success.
                writeln("{$startFile} stop success");
                break;
            }
            exit(0);
        break;
        case 'reload':
            // 向主进程/管理进程发送SIGUSR1信号,将平稳地restart所有worker进程
            posi_kill($managerPid, SIGUSR1);
            writeln("{$startFile} reload");
            exit;
        case 'restart':
            @unlink(self::$pidFile);
            writeln("{$startFile} is stoping ...");
            // Send stop signal to master process.
            $masterPid && posix_kill($masterPid, SIGTERM);
            // Timeout.
            $timeout = 5;
            $startTime = time();
            // Check master process is still alive?
            while (1) {
                $masterIsAlive = $masterPid && posix_kill($masterPid, SIG_BLOCK);
                if ($masterIsAlive) {
            // Timeout?
                    if (time() - $startTime >= $timeout) {
                            writeln("{$startFile} stop fail");
                            exit;
                    }
            // Waiting amoment.
                    usleep(10000);
                    continue;
                }
                // Stop success.
                writeln("{$startFile} stop success");
                break;
            }
            self::$daemonize = true;
            break;
        case 'test':
            self::$testUnity = true;
            self::$testUnityDir = $command2;
            break;
        default:
    }
}
php-msf/src/HttpServer.php
public function onRequest($request, $response)
{
    do {
    
        // 如果构造是用了yield协程
        if ($init instanceof \Generator) {
            $this->scheduler->start(
                $init,
                $instance,
                function () use ($instance, $methodName) {
                    // 协程回调,再次执行method方法
                    $generator = $instance->$methodName(...array_values($this->route->getParams()));
                    
                    //......
                }
            );
        }
        
    // 资深的 C 语言用户可能熟悉另一种不同的 do-while 循环用法,把语句放在
    // do-while(0) 之中,在循环内部用 break 语句来结束执行循环。达到go语法的特征。
    } while (0);
}
php-msf/src/MSFServer.php
/**
* 异步Task任务回调
* 在task_worker进程内被调用。worker进程可以使用swoole_server->task函数向task_worker进程投递新的任务
* $task_id是任务ID,由swoole扩展内自动生成,用于区分不同的任务。
* $task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
*
* @param \swoole_server $serv
* @param int $taskId Task ID
* @param int $fromId 来自于哪个worker进程
* @param array $data 任务的内容
* @return mixed|null
* @throws Exception
*/
public function onTask($serv, $taskId, $fromId, $data)
{

}
php-msf/src/Process/Inotify.php
<?php
/**
* 监控目录
*/
public function inotify()
{
    $this->inotifyFd = inotify_init();

    stream_set_blocking($this->inotifyFd, 0);
    $dirIterator  = new \RecursiveDirectoryIterator($this->monitorDir);
    $iterator     = new \RecursiveIteratorIterator($dirIterator);
    $monitorFiles = [];
    $tempFiles    = [];

    /**
    * 循环遍历项目中的所有文件
    * 对包含PHP文件的目录进行监听
    */
    foreach ($iterator as $file) {
        $fileInfo = pathinfo($file);

        if (!isset($fileInfo['extension']) || $fileInfo['extension'] != 'php') {
            continue;
        }

        //改为监听目录
        $dirPath = $fileInfo['dirname'];
        if (!isset($tempFiles[$dirPath])) {
            $wd = inotify_add_watch($this->inotifyFd, $fileInfo['dirname'], IN_MODIFY | IN_CREATE | IN_IGNORED | IN_DELETE);
            $tempFiles[$dirPath] = $wd;
            $monitorFiles[$wd] = $dirPath;
        }
    }

    $tempFiles = null;

    /**
    * swoole_event_add
    * 参数2为可读回调函数
    * 在Server程序中使用,可以理解为在worker/taskworker进程中将此socket注册到epoll事件中。
    * 在Client程序中使用,可以理解为在客户端进程中将此socket注册到epoll事件中。
    */
    swoole_event_add($this->inotifyFd, function ($inotifyFd) use (&$monitorFiles) {
        $events = inotify_read($inotifyFd);
        $flag = true;
        foreach ($events as $ev) {
            if (pathinfo($ev['name'], PATHINFO_EXTENSION) != 'php') {
                //创建目录添加监听
                if ($ev['mask'] == 1073742080) {
                    ) {
                    $path = $monitorFiles[$ev['wd']] .'/'. $ev['name'];

                    $wd = inotify_add_watch($inotifyFd, $path, IN_MODIFY | IN_CREATE | IN_IGNORED | IN_DELETE);
                    $monitorFiles[$wd] = $path;
                    }
                    $flag = false;
                    continue;
                }
                writeln('RELOAD ' . $monitorFiles[$ev['wd']] .'/'. $ev['name'] . ' update');
            }
            if ($flag == true) {
                $this->MSFServer->server->reload();
            }
        }, null, SWOOLE_EVENT_READ);
    }
}
php-msf/src/Task/Task.php(Task 异步任务)
  • 在worker进程通过TaskProxy代理执行请求
  • Worker进程只是将任务投递给Tasker进程后立即返回,即是非阻塞的投递
  • Tasker进程执行相应的业务逻辑,在这里就是从MongoDB获取新的一个ID
  • Worker进程是通过协程获取到Tasker执行结果,即调用需要加yield关键字
php-msf/src/Task/TaskProxy.php
  • 通过协程机制,使用CTask将task写入task_worder池