php打造一个高并发的读写分离架构

我们在编写程序的时候经常会出现服务器挂了,宕机了,然后我们查看一下日志,发现要不是数据库连接数量过大,要不就是业务逻辑复杂导致响应超时,那么这个问题怎么解决呢,如何保护和核心数据库及核心对的业务逻辑不被用户的请求击穿。

想想苹果或小米的专门店在新品发布会后一旦遇到用户抢购,门店为了保持正常的秩序及运行,会设置网上预约及排队的规则,应为门店的员工数量有限,当大批用户来抢购时,他们也忙不过来,那么用户体验也非常差还会出错,那么门店就是一个小的系统,在it架构中,我们也可以采用这种方式来保护核心业务逻辑服务器及数据库的正常运行。

今天我们来用php打造一个高并发的读写分离架构

php打造一个高并发的读写分离<a href='/tag/arch.html'>架构</a>



本文采用php cli模式直连redis来获取sql数据缓存,那么即使数据库宕机了,缓存还在,缓存在什么情况下会失效呢,在用户对表进行写操作的时候,那么这个表的sql缓存都需要异步重新生成,异步sql生成缓存可以很好的保护数据库,防止缓存击穿数据库的现象发生,我们想看sql缓存的队列处理,这个队列处理可以用多进程来处理,比如实例化多个来进行并发处理。

<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$db = new PDO('mysql:host=localhost;dbname=guest', 'guest', 'p6yhG8iDRMNxdJ9w');
$sqlquenekey = 'sql_list'; //redis数
$db->setAttribute (\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
$db->setAttribute (\PDO::ATTR_PERSISTENT, true);

function exesql($sql, $table) {
    global $redis;
    $redis->set("sql-".md5($sql),null);
    $_v = $redis->get("sql-".md5($sql));
    if ($_v == "") {
        addsqljob($sql, $table);
        return false;
    } else {
        return $_v;
    }

}

function addsqljob($sql, $table) {
    global $redis;
    global $sqlquenekey;
    $redis->lPush($sqlquenekey, serialize(["act" => "read", "data" => $sql]));
    if (is_array($table)) {
        foreach ($table as $item) {
            $redis->sadd('tablesql'.$item, $sql);
        }
    }
    // $redis->set("sql-".md5($sql), serialize());
}

function updatesql($sql, $table) {
    global $redis;
    global $sqlquenekey;
    $redis->lPush($sqlquenekey, serialize(["act" => "write", "data" => $table]));
  
    // $redis->set("sql-".md5($sql), serialize());
}


$timer1 = Swoole\Timer::tick(2000, function () {
    echo "开始获取sql\n";
    $_data = exesql("select * from User", ["User"]);
    echo "结束获取sql\n";
    echo json_encode($_data)."\n";
});

while (true) {
    $_data = $redis->lPop($sqlquenekey);
    if (!$_data || $_data == 'nil') {

        continue;
    }
    $_data = unserialize($_data);
    echo "有任务
";
    if ($_data['act'] == 'read') {
        $res = $db->prepare($_data['data']); //准备查询语句
        $res->execute();
        echo $_data['data']."\n";
        $allrows = $res->fetchAll(PDO::FETCH_ASSOC); //以关联下标从结果集中获取所有数据
        echo "select 完成任务\n";
        $redis->set("sql-".md5($_data['data']), serialize($allrows));
    }
    //更新这个表的sql
    if ($_data['act'] == 'write') {
        $_sqls = $redis->sMembers('tablesql'.$_data['data']);
     
        foreach ($_sqls as $_sql) {
            $res = $db->prepare($_sql); //准备查询语句
            $res->execute();
            echo "{$_sql}\n";
            $allrows = $res->fetchAll(PDO::FETCH_ASSOC); //以关联下标从结果集中获取所有数据

            $redis->set("sql-".md5($_sql), serialize($allrows));
        }
        echo "update selelct完成任务\n";

    }



}

好了,我们来写一个sql获取数据

<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$db = new PDO('mysql:host=localhost;dbname=guest', 'guest', 'p6yhG8iDRMNxdJ9w');
$sqlquenekey = 'sql_list'; //redis数
$db->setAttribute (\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
$db->setAttribute (\PDO::ATTR_PERSISTENT, true);

function exesql($sql, $table) {
    global $redis;

    $_v = $redis->get("sql-".md5($sql));
    if ($_v == "") {
        addsqljob($sql, $table);
        return false;
    } else {
        return $_v;
    }

}

function addsqljob($sql, $table) {
    global $redis;
    global $sqlquenekey;
    $redis->lPush($sqlquenekey, serialize(["act" => "read", "data" => $sql]));
    if (is_array($table)) {
        foreach ($table as $item) {
            $redis->sadd('tablesql'.$item, $sql);
        }
    }
    // $redis->set("sql-".md5($sql), serialize());
}

function updatesql($sql, $table) {
    global $redis;
    global $sqlquenekey;
    $redis->lPush($sqlquenekey, serialize(["act" => "write", "data" => $table]));

    // $redis->set("sql-".md5($sql), serialize());
}


$timer1 = Swoole\Timer::tick(2000, function () {
    echo "开始获取sql\n";
    $_data = exesql("select * from User", ["User"]);
    echo "结束获取sql\n";
    echo json_encode($_data)."\n";
});
$timer2 = Swoole\Timer::tick(6000, function () {
    echo "开始更新sql\n";
    updatesql("select * from User", "User");
});

后期扩容只要增加缓存服务器及数据库服务器还有核心业务数据库即可,有效地保护了核心数据库被前台请求击穿的现象。

{{collectdata}}

网友评论0