在 Yii2 高级模板中基于 Yii2 队列扩展实现异步执行任务,附加事件处理器,在(每次成功执行作业后、在作业执行期间发生未捕获的异常时)
1、通过配置附加事件处理器,编辑 \environments\dev\common\config\main-local.php、\environments\prod\common\config\main-local.php,如图1
'copyAssetQueue' => [ // 复制资源文件队列 'class' => 'yii\queue\redis\Queue', 'redis' => 'redis', // Redis 连接组件或它的配置 'channel' => 'cpa:queue:copy:asset', // 队列键前缀 'ttr' => 10 * 60, // 作业处理的最长时间,单位(秒) 'on afterExec' => ['common\components\queue\CopyAssetEventHandler', 'afterExec'], // 每次成功执行作业后 'on afterError' => ['common\components\queue\CopyAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时 'as log' => 'yii\queue\LogBehavior', ], 'uploadAssetQueue' => [ // 上传资源文件队列 'class' => 'yii\queue\redis\Queue', 'redis' => 'redis', // Redis 连接组件或它的配置 'channel' => 'cpa:queue:upload:asset', // 队列键前缀 'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒) 'on afterExec' => ['common\components\queue\UploadAssetEventHandler', 'afterExec'], // 每次成功执行作业后 'on afterError' => ['common\components\queue\UploadAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时 'as log' => 'yii\queue\LogBehavior', ], 'pubArticleQueue' => [ // 发布文章队列 'class' => 'yii\queue\redis\Queue', 'redis' => 'redis', // Redis 连接组件或它的配置 'channel' => 'cpa:queue:pub:article', // 队列键前缀 'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒) 'on afterExec' => ['common\components\queue\PubArticleEventHandler', 'afterExec'], // 每次成功执行作业后 'on afterError' => ['common\components\queue\PubArticleEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时 'as log' => 'yii\queue\LogBehavior', ], 'sourceCallbackQueue' => [ // 来源回调队列 'class' => 'yii\queue\redis\Queue', 'redis' => 'redis', // Redis 连接组件或它的配置 'channel' => 'cpa:queue:source:callback', // 队列键前缀 'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒) 'on afterExec' => ['common\components\queue\SourceCallbackEventHandler', 'afterExec'], // 每次成功执行作业后 'on afterError' => ['common\components\queue\SourceCallbackEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时 'as log' => 'yii\queue\LogBehavior', ],
2、编辑 \qq\rests\qq_cw_app\IndexAction.php,一个推送队列的入口
$data = [ 'channel_id' => 2, // 渠道ID 'channel_code' => 'wx', // 渠道代码,qq:企鹅号;wx:微信公众帐号 'channel_type_id' => 3, // 渠道的类型ID 'channel_type_code' => 'wx', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用 'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体 'task_id' => 2, // 任务ID ]; $assets = [ [ 'type' => 'image', 'channel_article_id' => 1, 'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/images/1.png', ], [ 'type' => 'video', 'channel_article_id' => 1, 'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4', ], ]; $assetServiceCopyAssetsAsyncResult = AssetService::copyAssetsAsync($data, $assets); print_r($assetServiceCopyAssetsAsyncResult); exit;
3、编辑 \common\services\AssetService.php,复制来源的资源文件至渠道发布的资源目录,队列任务执行成功后,调用相应服务,否则,插入发布日志(异步)
/** * 复制来源的资源文件至渠道发布的资源目录,队列任务执行成功后,调用相应服务,否则,插入发布日志(异步) * @param array $data 数据 * 格式如下: * [ * 'channel_id' => 1, // 渠道ID * 'channel_code' => 'qq', // 渠道代码,qq:企鹅号;wx:微信公众帐号 * 'channel_type_id' => 1, // 渠道的类型ID * 'channel_type_code' => 'qq_cw', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用 * 'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体 * 'task_id' => 1, // 任务ID * ] * * @param array $assets 来源的资源文件的绝对URL * 格式如下: * [ * [ * 'type' => 'image', // 资源文件的类型,image:图片;video:视频 * 'channel_article_id' => 1, // 渠道的文章ID * 'absolute_url' => 'http://localhost/spider/storage/spider/images/1.png', // 来源的资源文件的绝对URL * ], * [ * 'type' => 'video', // 资源文件的类型,image:图片;video:视频 * 'channel_article_id' => 1, // 渠道的文章ID * 'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4', // 来源的资源文件的绝对URL * ], * ] * * @return array $channelPubApiAssetAbsolutePaths 渠道发布的资源文件的相对路径 * 格式如下: * [ * [ * 'type' => 'image', * 'channel_article_id' => 1, * 'relative_path' => '/2018/09/20/1537439889.2333.1441541478.png', * ], * [ * 'type' => 'video', * 'channel_article_id' => 1, * 'relative_path' => '/2018/09/20/1537439889.2403.62871268.mp4', * ], * ] * * @throws Exception execution failed */ public static function copyAssetsAsync($data, $assets) { $assetData = []; $time = time(); foreach ($assets as $key => $asset) { $assetData[] = [ 'channel_id' => $data['channel_id'], 'channel_code' => $data['channel_code'], 'channel_type_id' => $data['channel_type_id'], 'channel_type_code' => $data['channel_type_code'], 'source' => $data['source'], 'type' => $asset['type'], 'absolute_url' => $asset['absolute_url'], 'relative_path' => '', 'size' => 0, 'task_id' => $data['task_id'], 'channel_article_id' => $asset['channel_article_id'], 'status' => Asset::STATUS_ENABLED, 'is_deleted' => Asset::IS_DELETED_NO, 'created_at' => $time, 'updated_at' => $time, 'deleted_at' => Asset::DELETED_AT_DEFAULT, ]; } // 批量创建资源 $asset = new Asset(); $assetCreateMultipleResult = $asset->createMultiple($assetData); // 将任务发送到队列(复制资源文件队列),通过标准工作人员进行处理 Yii::$app->copyAssetQueue->push(new CopyAssetJob([ 'taskId' => $data['task_id'], ])); }
4、编辑 \common\jobs\CopyAssetJob.php,队列的任务类
<?php /** * Created by PhpStorm. * User: Qiang Wang * Date: 2018/10/22 * Time: 17:10 */ namespace common\jobs; use Yii; use common\logics\Asset; use common\services\TaskService; use common\services\AssetService; use yii\web\ServerErrorHttpException; /** * 复制来源的资源文件至渠道发布的资源目录 * * @author Qiang Wang <shuijingwanwq@163.com> * @since 1.0 */ class CopyAssetJob extends Job { public $taskId; /* * @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常 */ public function execute($queue) { // 基于ID查找状态为启用的单个数据模型(任务) $taskEnabledItem = TaskService::findModelEnabledById($this->taskId); // 基于任务ID查找状态为启用的资源列表 $assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId); if (empty($assetEnabledItems)) { throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020); } $source = $taskEnabledItem->source; $assets = []; foreach ($assetEnabledItems as $assetEnabledItem) { $assets[] = [ 'type' => $assetEnabledItem->type, 'absolute_url' => $assetEnabledItem->absolute_url, ]; } // 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步) $assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets); foreach ($assetEnabledItems as $key => $assetEnabledItem) { $assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path']; // 取得文件大小,单位(字节) $assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']); $assetEnabledItems[$key] = $assetEnabledItem; } // 批量更新资源 $assetEnabledItem->updateMultiple($assetEnabledItems); } }
5、编辑 \common\components\queue\CopyAssetEventHandler.php,在配置中所定义的事件处理器,当复制资源文件队列,每次成功执行作业后(afterExec),将调用相应服务进行后续处理,即推送任务至上传资源文件队列;当复制资源文件队列,在作业执行期间发生未捕获的异常时(afterError),插入发布日志,将作业推送至来源回调队列
<?php /** * Created by PhpStorm. * User: Qiang Wang * Date: 2018/10/23 * Time: 14:23 */ namespace common\components\queue; use Yii; use common\logics\ChannelAppTask; use common\logics\PubLog; use common\services\PubLogService; use common\services\TaskService; use common\services\SourceCallbackService; use yii\helpers\Json; use yii\base\Component; use yii\queue\ExecEvent; use yii\web\NotFoundHttpException; use yii\web\UnprocessableEntityHttpException; use yii\web\ServerErrorHttpException; use yii\db\Exception; /** * Class CopyAssetEventHandler * @package common\components\queue * * @author Qiang Wang <shuijingwanwq@163.com> * @since 1.0 */ class CopyAssetEventHandler extends Component { /** * @param ExecEvent $event * @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常 * @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常 * @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常 * @throws Exception execution failed */ public static function afterExec(ExecEvent $event) { $taskId = $event->job->taskId; // 基于ID查找状态为启用的单个数据模型(任务) $taskEnabledItem = TaskService::findModelEnabledById($taskId); // 基于任务ID查找状态为启用的资源列表 $channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId); if (empty($channelAppTaskEnabledItems)) { throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021); } try { // 调用相应服务进行后续处理 $serviceClass = 'common\services\\' . str_replace(' ', '', ucwords(str_replace('_', ' ', $taskEnabledItem->channel_type_code))) . 'AssetService'; // 例:common\services\QqCwAssetService $serviceAction = 'copyAssetExecHandler'; $serviceClass::$serviceAction($taskEnabledItem->id); } catch (\Throwable $e) { $pubLogData = []; $time = time(); foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) { $pubLogData[] = [ 'channel_id' => $channelAppTaskEnabledItem['channel_id'], 'channel_code' => $channelAppTaskEnabledItem['channel_code'], 'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'], 'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'], 'task_id' => $channelAppTaskEnabledItem['task_id'], 'channel_app_task_id' => $channelAppTaskEnabledItem['id'], 'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'], 'code' => $e->getCode(), 'message' => $e->getMessage(), 'data' => Json::encode([]), 'status' => PubLog::STATUS_ERROR, 'is_deleted' => PubLog::IS_DELETED_NO, 'created_at' => $time, 'updated_at' => $time, 'deleted_at' => PubLog::DELETED_AT_DEFAULT, ]; } // 发布任务成功后,调用相应服务失败,插入发布日志,将作业推送至来源回调队列(异步) SourceCallbackService::errorAsync($pubLogData); } } /** * @param ExecEvent $event * @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常 * @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常 * @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常 * @throws Exception execution failed */ public static function afterError(ExecEvent $event) { $taskId = $event->job->taskId; // 基于ID查找状态为启用的单个数据模型(任务) $taskEnabledItem = TaskService::findModelEnabledById($taskId); // 基于任务ID查找状态为启用的资源列表 $channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId); if (empty($channelAppTaskEnabledItems)) { throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021); } $pubLogData = []; $time = time(); foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) { $pubLogData[] = [ 'channel_id' => $channelAppTaskEnabledItem['channel_id'], 'channel_code' => $channelAppTaskEnabledItem['channel_code'], 'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'], 'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'], 'task_id' => $channelAppTaskEnabledItem['task_id'], 'channel_app_task_id' => $channelAppTaskEnabledItem['id'], 'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'], 'code' => $event->error->getCode(), 'message' => $event->error->getMessage(), 'data' => Json::encode([]), 'status' => PubLog::STATUS_ERROR, 'is_deleted' => PubLog::IS_DELETED_NO, 'created_at' => $time, 'updated_at' => $time, 'deleted_at' => PubLog::DELETED_AT_DEFAULT, ]; } // 发布任务失败后,插入发布日志,将作业推送至来源回调队列(异步) SourceCallbackService::errorAsync($pubLogData); } }
6、打开网址:http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider ,推送任务至队列
7、info 命令打印关于队列状态的信息,复制资源文件队列中1个任务状态为等待,上传资源文件队列中0个任务状态为等待
.\yii copy-asset-queue/info --color=0 .\yii upload-asset-queue/info --color=0
Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0
8、run 命令获取并执行循环中的任务(复制资源文件队列),直到队列为空,复制资源文件队列中的任务成功执行后,将下一步的上传任务推送至上传资源文件队列,复制资源文件队列中0个任务状态为等待,复制资源文件队列中1个任务状态为完成,上传资源文件队列中1个任务状态为等待,如图2
.\yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0 .\yii copy-asset-queue/info .\yii upload-asset-queue/info --color=0
2018-10-27 17:23:54 [pid: 5216] - Worker is started 2018-10-27 17:23:54 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 5216) - Started 2018-10-27 17:23:55 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 5216) - Done (0.249 s) 2018-10-27 17:23:55 [pid: 5216] - Worker is stopped (0:00:01) Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 1 Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0
9、编辑 \common\jobs\CopyAssetJob.php,队列的任务类,特意抛出一个异常,以测试当复制资源文件队列,在作业执行期间发生未捕获的异常时(afterError),插入发布日志,将作业推送至来源回调队列
<?php /** * Created by PhpStorm. * User: Qiang Wang * Date: 2018/10/22 * Time: 17:10 */ namespace common\jobs; use Yii; use common\logics\Asset; use common\services\TaskService; use common\services\AssetService; use yii\web\ServerErrorHttpException; /** * 复制来源的资源文件至渠道发布的资源目录 * * @author Qiang Wang <shuijingwanwq@163.com> * @since 1.0 */ class CopyAssetJob extends Job { public $taskId; /* * @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常 */ public function execute($queue) { // 基于ID查找状态为启用的单个数据模型(任务) $taskEnabledItem = TaskService::findModelEnabledById($this->taskId); // 基于任务ID查找状态为启用的资源列表 $assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId); throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020); if (empty($assetEnabledItems)) { throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020); } $source = $taskEnabledItem->source; $assets = []; foreach ($assetEnabledItems as $assetEnabledItem) { $assets[] = [ 'type' => $assetEnabledItem->type, 'absolute_url' => $assetEnabledItem->absolute_url, ]; } // 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步) $assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets); foreach ($assetEnabledItems as $key => $assetEnabledItem) { $assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path']; // 取得文件大小,单位(字节) $assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']); $assetEnabledItems[$key] = $assetEnabledItem; } // 批量更新资源 $assetEnabledItem->updateMultiple($assetEnabledItems); } }
10、清空 Redis,即清空队列中的数据,打开网址:http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider ,推送任务至队列
11、info 命令打印关于队列状态的信息,复制资源文件队列中1个任务状态为等待,上传资源文件队列中0个任务状态为等待,来源回调队列中0个任务状态为等待
.\yii copy-asset-queue/info --color=0 .\yii upload-asset-queue/info --color=0 .\yii source-callback-queue/info --color=0
Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0
12、run 命令获取并执行循环中的任务(复制资源文件队列),直到队列为空,复制资源文件队列中的任务执行失败(因为任务中有抛出未捕获的异常)后,将下一步的上传任务推送至上传资源文件队列,复制资源文件队列中0个任务状态为等待,复制资源文件队列中1个任务状态为完成,上传资源文件队列中0个任务状态为等待,来源回调队列中1个任务状态为等待,如图3
.\yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0 .\yii copy-asset-queue/info --color=0 .\yii upload-asset-queue/info --color=0 .\yii source-callback-queue/info --color=0
2018-10-27 17:37:10 [pid: 32132] - Worker is started 2018-10-27 17:37:11 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 32132) - Started 2018-10-27 17:37:11 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 32132) - Error (0.232 s) > yii\web\ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty 2018-10-27 17:37:11 [pid: 32132] - Worker is stopped (0:00:01) Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 1 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0
13、查看系统日志表的异常信息,即 log 表中的 message,有助于后续开发人员的分析工作,如图4
[1] common\jobs\CopyAssetJob (attempt: 1, PID: 32132) is finished with error: yii\web\ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty in E:\wwwroot\channel-pub-api\common\jobs\CopyAssetJob.php:38 Stack trace: #0 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2-queue\src\Queue.php(214): common\jobs\CopyAssetJob->execute(Object(yii\queue\redis\Queue)) #1 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2-queue\src\cli\Queue.php(162): yii\queue\Queue->handleMessage('1', 'O:24:"common\\jo...', '600', '1') #2 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2-queue\src\cli\Command.php(145): yii\queue\cli\Queue->execute('1', 'O:24:"common\\jo...', '600', '1', '32132') #3 [internal function]: yii\queue\cli\Command->actionExec('1', '600', '1', '32132') #4 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\InlineAction.php(57): call_user_func_array(Array, Array) #5 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\Controller.php(157): yii\base\InlineAction->runWithParams(Array) #6 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\console\Controller.php(148): yii\base\Controller->runAction('exec', Array) #7 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\Module.php(528): yii\console\Controller->runAction('exec', Array) #8 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\console\Application.php(180): yii\base\Module->runAction('copy-asset-queu...', Array) #9 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\console\Application.php(147): yii\console\Application->runAction('copy-asset-queu...', Array) #10 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\Application.php(386): yii\console\Application->handleRequest(Object(yii\console\Request)) #11 E:\wwwroot\channel-pub-api\yii(23): yii\base\Application->run() #12 {main}.
14、发布任务失败后,插入发布日志,将作业推送至来源回调队列(异步),查看发布日志表中的 message,即 pub_log,此为业务数据,最终会显示给用户,如图5
Based on task ID: 2, find the list of enabled resources is empty
近期评论