/ yii2

yii2-queue

https://github.com/yiisoft/yii2-queue
https://github.com/yiisoft-contrib/yiiframework.com

通过这个队列扩展可以异步执行任务。Yii2 高并发业务的实现。

在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。

应用场景

应用场景:每次有新产品发布的时候,给所有老用户每人发送一条模板消息(短信或者邮件)。

但是我如果在发布新产品的时候就一次性发送模板消息有可能导致速度问题和被封的危险。所以我想将接收者们在时间上分开,比如100个人,每5-10秒发送一个,我用yii2-queue来实现它。

安装 Queue 扩展

https://github.com/yiisoft/yii2-queue

composer require --prefer-dist yiisoft/yii2-queue

使用 DataBase

common 配置文件:

return [
    'components' => [
        'db' => [
            'class' => 'yii\db\Connection',
            'charset' => 'utf8mb4',
            'dsn' => 'mysql:host=localhost;dbname=blog',
            'username' => 'root',
            'password' => 'password',
        ],
        'queue' => [
            'class' => \yii\queue\db\Queue::class,
            'db' => 'db', // DB connection component or its config
            'tableName' => '{{%queue}}', // Table name
            'channel' => 'default', // Queue channel key
            'mutex' => \yii\mutex\MysqlMutex::class, // Mutex used to sync queries
        ],
    ],
];

console 配置文件:

return [
    // 我们将queue加到预加载属性bootstrap中,这样console 就有了命令 ./yii queue
    'bootstrap' => ['log', 'queue'],
    'controllerMap' => [
        // ...
        'migrate' => [
            'class' => 'yii\console\controllers\MigrateController',
            'migrationPath' => null,
            'migrationNamespaces' => [
                'yii\queue\db\migrations',
            ],
        ],
    ],
];

然后使用迁移命令生成 queue 表:

./yii migrate/up

使用 queue 相关命令:

./yii queue/info

This command obtains and executes tasks in a loop until the queue is empty:

./yii queue/run

This command launches a daemon which infinitely queries the queue:

./yii queue/listen

So, queue/listen should run as a daemon or queue/run as a cronjob.

新建 jobs

frontend/jobs;

class DownloadJob extends BaseObject implements \yii\queue\JobInterface
{
    public $url;
    public $file;
    
    public function execute($queue)
    {
        file_put_contents($this->file, file_get_contents($this->url));
    }
}

将任务推送到队列中

// Push a job into queue and get massage ID.  
$id = Yii::$app->queue->push(new frontend\components\DownloadJob([
    'url' => 'http://example.com/image.jpg',
    'file' => '/tmp/image.jpg',
]));

// Check whether the job is waiting for execution.
Yii::$app->queue->isWaiting($id);

// Check whether a worker got the job from the queue and executes it.
Yii::$app->queue->isReserved($id);

// Check whether a worker has executed the job.
Yii::$app->queue->isDone($id);

或者加一点延迟:

Yii::$app->queue->delay(5 * 60)->push(new frontend\components\DownloadJob([
    'url' => 'http://example.com/image.jpg',
    'file' => '/tmp/image.jpg',
]));

Systemd 创建守护进程消费队列

sudo vim /etc/systemd/system/yii-queue@.service
[Unit]
Description=Yii Queue Worker %I
After=network.target
# the following two lines only apply if your queue backend is mysql
# replace this with the service that powers your backend
After=mysql.service
Requires=mysql.service

[Service]
User=www-data
Group=www-data
ExecStart=/usr/bin/php /var/www/项目目录/yii queue/listen --verbose
Restart=on-failure

[Install]
WantedBy=multi-user.target
  • apache 的 user 和 group 为 www-data
  • nginx 的 user 和 group 为 nginx

You need to reload systemd in order to re-read its configuration:

sudo systemctl daemon-reload

Set of commands to control workers:

# To start two workers
sudo systemctl start yii-queue@1 yii-queue@2

# To get the status of running workers
sudo systemctl status "yii-queue@*"

# To stop a specific worker
sudo systemctl stop yii-queue@2

# To stop all running workers
sudo systemctl stop "yii-queue@*"

# To start two workers at system boot
sudo systemctl enable yii-queue@1 yii-queue@2

使用 Redis

使用 redis 作为 queue 的介质。
安装 Reids 扩展:https://github.com/yiisoft/yii2-redis

composer require --prefer-dist yiisoft/yii2-redis:"~2.0.0"
return [
    'components' => [
        'redis' => [
            'class' => 'yii\redis\Connection',
            'hostname' => 'localhost',
            'port' => 6379,
            'database' => 0,
        ],
        'queue' => [
            'class' => \yii\queue\redis\Queue::class,
            'as log' => \yii\queue\LogBehavior::class,
            // Other driver options
        ],
    ],
];