示例项目
使用msf构建业务应用的推荐方式,主要目录结构为
.
├── README.md
├── app // PHP业务代码
│ ├── AppServer.php // 应用server类,可根据需求自定义
│ ├── Controllers // 控制器类目录
│ ├── Lib // 特殊逻辑处理类目录
│ ├── Models // Model类目录
│ ├── Route // 特殊路由规则**类**目录
│ ├── Tasks // Task类目录
│ └── Views // 视图文件目录
├── build.sh // 构建脚本(拉取docker镜像,启动容器)
├── checkstyle.sh // 代码检查脚本
├── composer.json // composer包依赖配置文件
├── config // 配置目录
│ ├── check.php // 代码检查配置
│ ├── server.php // 主配置文件(server服务相关)
│ ├── constant.php // 业务常量定义文件
│ ├── log.php // 全局日志配置
│ ├── http.php // HTTP服务配置
│ ├── params.php // 全局业务配置(和运行环境无关)
│ ├── dev // 研发联调环境特殊配置目录
│ ├── docker // docker环境特殊配置目录
│ ├── product // 生产环境特殊配置目录
│ ├── qa // QA环境特殊配置目录
├── server.php // server启动脚本
├── console.php // 命令行脚本
├── test // 单元测试目录
└── www // server根目录
└── index.html // 索引文件
master分支(框架源代码)
框架源码,支持composer安装,具有composer包管理的所有特性,仅为框架代码,不包含任何业务逻辑,主要目录结构为
.
├── Base // 核心
│ ├── AOPFactory.php //切片类工厂
│ ├── Child.php // 基类
│ ├── Core.php // 内核基类
│ ├── Input.php // 用户输入对象
│ └── Output.php // 用户输出对象
│ └── Pool.php // 通用对象池类
├── Client // 客户端
│ ├── ConcurrentClient.php // flex风格的并行请求
│ ├── Http // HTTP客户端
│ │ └── Client.php // HTTP客户端实现
│ └── RpcClient.php // RPC Client
├── Console // 命令行
│ ├── Controller.php // 命令行控制器基类
│ └── Request.php // 命令行请求对象
├── Controllers // Web控制器
│ ├── Controller.php // web请求控制器基类
│ ├── Monitor.php // 内存服务器运行状态控制器
│ ├── Rest.php // Restful风格的API控制器示例
│ └── Rpc.php // 处理RPC请求控制器
├── Coroutine // 协程
│ ├── Base.php // IO协程基类
│ ├── CException.php // 协程异常
│ ├── CNull.php // 协程NULL值
│ ├── CTask.php // Task协程
│ ├── Dns.php // DNS协程
│ ├── Http.php // HTTP请求协程
│ ├── IBase.php // IO协程接口类
│ ├── MySql.php // MySQL协程
│ ├── Redis.php // Redis协程
│ ├── Scheduler.php // 协程调度器
│ └── Task.php // 协程类
├── Helpers // 辅助工具
│ ├── Common.php // 全局函数
│ └── Context.php // 请求上下文
├── HttpServer.php // Http Sever类
├── Marco.php // 全局类常量
├── Models // 模型
│ └── Model.php // 模型基类
├── MSFCli.php // Cli命令行
├── MSFServer.php // MSFServer服务类
├── Pack // 打包
│ ├── IPack.php // 打包基类
│ ├── JsonPack.php // Json
│ ├── MsgPack.php // MsgPack
│ └── SerializePack.php // Serialize
├── Pools // 连接池
│ ├── AsynPoolManager.php // 连接池管理器
│ ├── AsynPool.php // 连接池
│ ├── CoroutineRedisProxy.php // Redis连接池的协程辅助类
│ ├── IAsynPool.php // 连接池接口类
│ ├── Miner.php // MySQL Query Builder
│ ├── MysqlAsynPool.php // 异步MySQL连接池
│ └── RedisAsynPool.php // 异步Redis连接池
├── Process // 用户自定义进程
│ ├── Config.php // 配置管理进程
│ ├── Inotify.php // 文件监控进程
│ ├── ProcessBase.php // 自定义进程基类
│ └── Timer.php // 用户自定义定时器进程
├── Proxy // 代理
│ ├── IProxy.php // 代理接口
│ ├── RedisProxyCluster.php // Redis分布式代理类
│ ├── RedisProxyFactory.php // Redis代理工厂类
│ └── RedisProxyMasterSlave.php // Redis主从代理类
├── Rest // Restful
│ ├── Controller.php // Restful风格的API控制器基类
│ └── Serializer.php // Restful相关分页、序列化类
├── Route // 路由
│ ├── IRoute.php // 路由接口类
│ ├── NormalRoute.php // 通用路由类
│ └── RestRoute.php // Rest路由类
├── Server.php // Server基类
├── Tasks // Task
│ ├── MongoDbTask.php // MongoDB Task
│ ├── Task.php // Task基类
│ └── TaskProxy.php // Task Proxy
└── Views // 视图
├── error_404.php
└── test.html
RESTful
MSF 原生支持RESTful风格api,提供GET/POST/PUT/PATCH/HEAD/OPTIONS/DELETE动作的支持。
verb 介绍
'GET', // 从服务器取出资源(一项或多项)
'POST', // 在服务器新建一个资源
'PUT', // 在服务器更新资源(客户端提供改变后的完整资源)
'PATCH', // 在服务器更新资源(客户端提供改变的属性)
'DELETE', // 从服务器删除资源
'HEAD', // 获取 head 元数据
'OPTIONS', // 获取信息,关于资源的哪些属性是客户端可以改变的
MSF实现RESTful程序
Rest |- Controller.php 控制器
|- Route.php 路由器
使用方式
- 在配置文件中配置路由器为:
$config['server']['route_tool'] = '\\PG\\MSF\\Route\\RestRoute'
- 配置URL路由规则
- 控制器继承
PG\MSF\Rest\Controller
推荐控制器接收不同动作映射
'PUT,PATCH {id}' => 'update', // 更新资源,如:/users/<id>
'DELETE {id}' => 'delete', // 删除资源,如:/users/<id>
'GET,HEAD {id}' => 'view', // 查看资源单条数据,如:/users/<id>
'POST' => 'create', // 新建资源,如:/users
'GET,HEAD' => 'index', // 查看资源列表数据(可分页),如:/users
'{id}' => 'options', // 查看资源所支持的HTTP动词,如:/users/<id> | /users
'' => 'options',
URL路由配置
通过请求url中的path和动作类型即可路由到对应控制器下的某个方法(method)。URL路由配置支持正则方式,在url的path中可携带参数。例如:
$config['rest']['route']['rules'] = [
'GET,POST /groups' => '/account/profile',
'GET /users/ask' => 'user/apply',
'GET /users' => 'user/index',
'GET /users/<uid:\d+>' => 'user/view',
'PUT /users/<method:\w+>' => 'user/<method>',
'DELETE /users/<uid:\d+>' => 'user/delete',
]
MSF协程调度器
MSF协程基于Generator/Yield,IO事件触发,是自主研发的调度器,它的核心思想是发生任何异步IO操作之后,程序的控制流就切换到其他请求,待异步IO可读之后,由程序自行调用调度器接口,进行一次调度,并响应请求。MSF协程完全抛弃了定时器每隔一定时间轮询任务的方案,使得调度器的性能更加接近原生的异步回调方式。
同步任务
同步任务即Task,用来做一些异步的慢速任务,比如发邮件、批量任何、任何不支持异步Client的服务调用等等。MSF框架是纯异步Server架构,也就是说任何耗时的任务的代码不能放到Worker进程中执行,所以我们利用Swoole的Task模块来完成耗时任务。
Task进程
同步任务代码是在Task进程中执行的,Task进程具有以下的特性:
- 同步阻塞
- 支持定时器
目前MSF框架封装了异步的Http Client、Redis Client、MySQL Client,除了这几种原生支持异步外,任何其他的非异步Client的网络IO均要封装成Task,比如MongoDB Task。
其他注点意
$response>end()函数
在swoole_http_server
中,当调用$response->end()
之后的代码是继续执行的,但不能多次调用
Task是否可使用异步函数以及协程调度?
- 不能使用异步函数。Task是同步任务,任务非异步的Client的网络IO都要封装成Task使用。
- Task是没有调度器的,task_worker进程只进程onTask函数的处理。
if (!$serv->taskworker) {//worker进程 $this->scheduler = new Scheduler(); self::setProcessTitle($this->config['server.process_title'] . '-Worker'); } else { self::setProcessTitle($this->config['server.process_title'] . '-Tasker'); }
销毁对象机制
- 每创建对象都要调用
__supportAutoDestroy
函数,用以生成类属性默认值待请求结束后重新初始化,以下属性不做重新初始化,永久使用直到被销毁- static修饰的成员属性
- public修饰的
__useCount
、__genTime
、__coreName
、__DSLevel
、__reflections
属性
自定义类如何使用框架指定函数创建对象?
核心文件说明
Flexihash\Flexihash
redis组件使用一致性算法的类库
php-aop/src/Wrapper
- AOP 包装器
- AOP包装库提供主要切面功能,注入前置及后置方法
- AOP适配器兼容协程适配对象
php-msf/src/Server.php
/**
* 解析命令行参数
*
* @return void
*/
protected static function parseCommand()
{
global $argv;
// Check argv;
$startFile = $argv[0];
if (!isset($argv[1])) {
$argv[1] = 'start';
}
// Get command.
$command = trim($argv[1]);
$command2 = isset($argv[2]) ? $argv[2] : '';
// Start command.
$mode = '';
if (file_exists(self::$pidFile)) {
$pids = explode(',', file_get_contents(self::$pidFile));
// Get master process PID.
$masterPid = $pids[0];
$managerPid = $pids[1];
/**
* SIG_BLOCK
* 按照参数 set 提供的屏蔽字,屏蔽信号。
* 并将原信号屏蔽保存到oldset中。
*/
$masterIsAlive = $masterPid && @posix_kill($masterPid, SIG_BLOCK);
} else {
$masterIsAlive = false;
}
// Master is still alive?
if ($masterIsAlive) {
if ($command === 'start' || $command === 'test') {
writeln("{$startFile} already running");
exit;
}
} elseif ($command !== 'start' && $command !== 'test') {
writeln("{$startFile} not run");
exit;
}
// execute command.
switch ($command) {
case 'start':
if ($command2 === '-d') {
self::$daemonize = true;
}
break;
case 'stop':
@unlink(self::$pidFile);
writeln("$startFile is stoping ...");
/**
* Send stop signal to master process.
* SIGTERM 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和处理.
* 通常用来要求程序自己正常退出. shell命令kill缺省产生这个信号.
*/
$masterPid && posix_kill($masterPid, SIGTERM);
// Timeout.
$timeout = 5;
$startTime = time();
// Check master process is still alive?
while (1) {
$masterIsAlive = $masterPid && posix_kill($masterPid, SIG_BLOCK);
if ($masterIsAlive) {
// Timeout?
if (time() - $startTime >= $timeout) {
writeln("{$startFile} stop fail");
exit;
}
// Waiting amoment.
usleep(10000);
continue;
}
// Stop success.
writeln("{$startFile} stop success");
break;
}
exit(0);
break;
case 'reload':
// 向主进程/管理进程发送SIGUSR1信号,将平稳地restart所有worker进程
posi_kill($managerPid, SIGUSR1);
writeln("{$startFile} reload");
exit;
case 'restart':
@unlink(self::$pidFile);
writeln("{$startFile} is stoping ...");
// Send stop signal to master process.
$masterPid && posix_kill($masterPid, SIGTERM);
// Timeout.
$timeout = 5;
$startTime = time();
// Check master process is still alive?
while (1) {
$masterIsAlive = $masterPid && posix_kill($masterPid, SIG_BLOCK);
if ($masterIsAlive) {
// Timeout?
if (time() - $startTime >= $timeout) {
writeln("{$startFile} stop fail");
exit;
}
// Waiting amoment.
usleep(10000);
continue;
}
// Stop success.
writeln("{$startFile} stop success");
break;
}
self::$daemonize = true;
break;
case 'test':
self::$testUnity = true;
self::$testUnityDir = $command2;
break;
default:
}
}
php-msf/src/HttpServer.php
public function onRequest($request, $response)
{
do {
// 如果构造是用了yield协程
if ($init instanceof \Generator) {
$this->scheduler->start(
$init,
$instance,
function () use ($instance, $methodName) {
// 协程回调,再次执行method方法
$generator = $instance->$methodName(...array_values($this->route->getParams()));
//......
}
);
}
// 资深的 C 语言用户可能熟悉另一种不同的 do-while 循环用法,把语句放在
// do-while(0) 之中,在循环内部用 break 语句来结束执行循环。达到go语法的特征。
} while (0);
}
php-msf/src/MSFServer.php
/**
* 异步Task任务回调
* 在task_worker进程内被调用。worker进程可以使用swoole_server->task函数向task_worker进程投递新的任务
* $task_id是任务ID,由swoole扩展内自动生成,用于区分不同的任务。
* $task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
*
* @param \swoole_server $serv
* @param int $taskId Task ID
* @param int $fromId 来自于哪个worker进程
* @param array $data 任务的内容
* @return mixed|null
* @throws Exception
*/
public function onTask($serv, $taskId, $fromId, $data)
{
}
php-msf/src/Process/Inotify.php
<?php
/**
* 监控目录
*/
public function inotify()
{
$this->inotifyFd = inotify_init();
stream_set_blocking($this->inotifyFd, 0);
$dirIterator = new \RecursiveDirectoryIterator($this->monitorDir);
$iterator = new \RecursiveIteratorIterator($dirIterator);
$monitorFiles = [];
$tempFiles = [];
/**
* 循环遍历项目中的所有文件
* 对包含PHP文件的目录进行监听
*/
foreach ($iterator as $file) {
$fileInfo = pathinfo($file);
if (!isset($fileInfo['extension']) || $fileInfo['extension'] != 'php') {
continue;
}
//改为监听目录
$dirPath = $fileInfo['dirname'];
if (!isset($tempFiles[$dirPath])) {
$wd = inotify_add_watch($this->inotifyFd, $fileInfo['dirname'], IN_MODIFY | IN_CREATE | IN_IGNORED | IN_DELETE);
$tempFiles[$dirPath] = $wd;
$monitorFiles[$wd] = $dirPath;
}
}
$tempFiles = null;
/**
* swoole_event_add
* 参数2为可读回调函数
* 在Server程序中使用,可以理解为在worker/taskworker进程中将此socket注册到epoll事件中。
* 在Client程序中使用,可以理解为在客户端进程中将此socket注册到epoll事件中。
*/
swoole_event_add($this->inotifyFd, function ($inotifyFd) use (&$monitorFiles) {
$events = inotify_read($inotifyFd);
$flag = true;
foreach ($events as $ev) {
if (pathinfo($ev['name'], PATHINFO_EXTENSION) != 'php') {
//创建目录添加监听
if ($ev['mask'] == 1073742080) {
) {
$path = $monitorFiles[$ev['wd']] .'/'. $ev['name'];
$wd = inotify_add_watch($inotifyFd, $path, IN_MODIFY | IN_CREATE | IN_IGNORED | IN_DELETE);
$monitorFiles[$wd] = $path;
}
$flag = false;
continue;
}
writeln('RELOAD ' . $monitorFiles[$ev['wd']] .'/'. $ev['name'] . ' update');
}
if ($flag == true) {
$this->MSFServer->server->reload();
}
}, null, SWOOLE_EVENT_READ);
}
}
php-msf/src/Task/Task.php(Task 异步任务)
- 在worker进程通过TaskProxy代理执行请求
- Worker进程只是将任务投递给Tasker进程后立即返回,即是非阻塞的投递
- Tasker进程执行相应的业务逻辑,在这里就是从MongoDB获取新的一个ID
- Worker进程是通过协程获取到Tasker执行结果,即调用需要加yield关键字
php-msf/src/Task/TaskProxy.php
- 通过协程机制,使用CTask将task写入task_worder池