从源码分析 MGR 的流控机制( 二 )


最后会通过is_flow_control_needed判断是否需要流控 。如果需要流控 , 则会将 m_holds_in_period 自增加 1 。
如果是 Debug 版本,且将 log_error_verbosity 设置为 3 。当需要流控时,会在错误日志中打印以下信息 。
[Note] [MY-011726] [Repl] Plugin group_replication reported: 'Flow control - update member stats: 127.0.0.1:33071 stats certifier_queue 0, applier_queue 20 certified 387797 (308), applied 387786 (289), local 0 (0), quota 400 (274) mode=1'什么时候会触发流控呢?
接下来我们看看 is_flow_control_needed 函数的处理逻辑 。
bool Pipeline_member_stats::is_flow_control_needed() {  return (m_flow_control_mode == FCM_QUOTA) &&         (m_transactions_waiting_certification >              get_flow_control_certifier_threshold_var() ||          m_transactions_waiting_apply >              get_flow_control_applier_threshold_var());}由此来看,触发流控需满足以下条件:

  1. group_replication_flow_control_mode 设置为 QUOTA 。
  2. 当前等待认证的事务数大于 group_replication_flow_control_certifier_threshold 。
    当前等待认证的事务数可通过 performance_schema.replication_group_member_stats 中的 COUNT_TRANSACTIONS_IN_QUEUE 查看 。
  3. 当前等待应用的事务数大于 group_replication_flow_control_applier_threshold 。
    当前等待应用的事务数可通过 performance_schema.replication_group_member_stats 中的 COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE 查看 。
除了条件 1,条件 2,3 满足其一即可 。
当需要流控时,会将 m_holds_in_period 自增加 1 。
m_holds_in_period 这个变量会用在 Flow_control_module::flow_control_step 中 。
而 Flow_control_module::flow_control_step 是在 Certifier_broadcast_thread::dispatcher() 中调用的 , 每秒执行一次 。
void Certifier_broadcast_thread::dispatcher() {  ...  while (!aborted) {    ...    applier_module->run_flow_control_step();    ...    struct timespec abstime;    // 定义超时时长 1s 。    set_timespec(&abstime, 1);    mysql_cond_timedwait(&broadcast_dispatcher_cond, &broadcast_dispatcher_lock,                         &abstime);    mysql_mutex_unlock(&broadcast_dispatcher_lock);    broadcast_counter++;  }}void run_flow_control_step() override {  flow_control_module.flow_control_step(&pipeline_stats_member_collector);}配额的计算逻辑接下来我们重点分析下 flow_control_step 函数的处理逻辑 。
这个函数非常关键,它是整个流控模块的核心 。
它主要是用来计算 m_quota_size 和 m_quota_used 。
其中,m_quota_size 决定了下个周期允许提交的事务数,即我们所说的配额 。
m_quota_used 用来统计下个周期已经提交的事务数,在该函数中会重置为 0 。
void Flow_control_module::flow_control_step(    Pipeline_stats_member_collector *member) {  // 这里的 seconds_to_skip 实际上就是 group_replication_flow_control_period,后面会有定义 。  // 虽然 flow_control_step 是一秒调用一次,但实际起作用的还是 group_replication_flow_control_period 。  if (--seconds_to_skip > 0) return;    // holds 即 m_holds_in_period  int32 holds = m_holds_in_period.exchange(0);  // get_flow_control_mode_var() 即 group_replication_flow_control_mode  Flow_control_mode fcm =      static_cast<Flow_control_mode>(get_flow_control_mode_var());  // get_flow_control_period_var() 即 group_replication_flow_control_period  seconds_to_skip = get_flow_control_period_var();  // 计数器  m_stamp++;  // 发送当前节点的状态信息  member->send_stats_member_message(fcm);  switch (fcm) {    case FCM_QUOTA: {      // get_flow_control_hold_percent_var() 即 group_replication_flow_control_hold_percent , 默认是 10      // 所以 HOLD_FACTOR 默认是 0.9      double HOLD_FACTOR =          1.0 -          static_cast<double>(get_flow_control_hold_percent_var()) / 100.0;      // get_flow_control_release_percent_var() 即 group_replication_flow_control_release_percent,默认是 50      // 所以 RELEASE_FACTOR 默认是 1.5      double RELEASE_FACTOR =          1.0 +          static_cast<double>(get_flow_control_release_percent_var()) / 100.0;      // get_flow_control_member_quota_percent_var() 即 group_replication_flow_control_member_quota_percent,默认是 0      // 所以 TARGET_FACTOR 默认是 0      double TARGET_FACTOR =          static_cast<double>(get_flow_control_member_quota_percent_var()) /          100.0;      // get_flow_control_max_quota_var() 即 group_replication_flow_control_max_quota,默认是 0      int64 max_quota = static_cast<int64>(get_flow_control_max_quota_var());      // 将上一个周期的 m_quota_size,m_quota_used 赋值给 quota_size,quota_used , 同时自身重置为 0      int64 quota_size = m_quota_size.exchange(0);      int64 quota_used = m_quota_used.exchange(0);      int64 extra_quota = (quota_size > 0 && quota_used > quota_size)                              ? quota_used - quota_size                              : 0;      if (extra_quota > 0) {        mysql_mutex_lock(&m_flow_control_lock);        // 发送一个信号,释放 do_wait() 处等待的事务        mysql_cond_broadcast(&m_flow_control_cond);        mysql_mutex_unlock(&m_flow_control_lock);      }      // m_holds_in_period 大于 0,则意味着需要进行流控      if (holds > 0) {        uint num_writing_members = 0, num_non_recovering_members = 0;        // MAXTPS 是 INT 的最大值,即 2147483647        int64 min_certifier_capacity = MAXTPS, min_applier_capacity = MAXTPS,              safe_capacity = MAXTPS;        m_flow_control_module_info_lock->rdlock();        Flow_control_module_info::iterator it = m_info.begin();        // 循环遍历所有节点的状态信息        while (it != m_info.end()) {            // 这一段源码中没有,加到这里可以直观的看到触发流控时,每个节点的状态信息 。#ifndef NDEBUG            it->second.debug(it->first.c_str(), quota_size,                     quota_used);#endif          if (it->second.get_stamp() < (m_stamp - 10)) {            // 如果节点的状态信息在最近 10 个周期内都没有更新,则清掉            m_info.erase(it++);          } else {            if (it->second.get_flow_control_mode() == FCM_QUOTA) {              // 如果 group_replication_flow_control_certifier_threshold 大于 0,              // 且上一个周期进行认证的事务数大于 0,              // 且当前等待认证的事务数大于 group_replication_flow_control_certifier_threshold,              // 且上一个周期进行认证的事务数小于 min_certifier_capacity              // 则会将上一个周期进行认证的事务数赋予 min_certifier_capacity              if (get_flow_control_certifier_threshold_var() > 0 &&                  it->second.get_delta_transactions_certified() > 0 &&                  it->second.get_transactions_waiting_certification() -                          get_flow_control_certifier_threshold_var() >                      0 &&                  min_certifier_capacity >                      it->second.get_delta_transactions_certified()) {                min_certifier_capacity =                    it->second.get_delta_transactions_certified();              }              if (it->second.get_delta_transactions_certified() > 0)                // safe_capacity 取 safe_capacity 和 it->second.get_delta_transactions_certified() 中的较小值                safe_capacity =                    std::min(safe_capacity,                             it->second.get_delta_transactions_certified());              // 针对的是 applier,逻辑同 certifier 一样              if (get_flow_control_applier_threshold_var() > 0 &&                  it->second.get_delta_transactions_applied() > 0 &&                  it->second.get_transactions_waiting_apply() -                          get_flow_control_applier_threshold_var() >                      0) {                if (min_applier_capacity >                    it->second.get_delta_transactions_applied())                  min_applier_capacity =                      it->second.get_delta_transactions_applied();                if (it->second.get_delta_transactions_applied() > 0)                  // 如果上一个周期有事务应用 , 说明该节点不是 recovering 节点                  num_non_recovering_members++;              }              if (it->second.get_delta_transactions_applied() > 0)                // safe_capacity 取 safe_capacity 和 it->second.get_delta_transactions_applied() 中的较小值                safe_capacity = std::min(                    safe_capacity, it->second.get_delta_transactions_applied());              if (it->second.get_delta_transactions_local() > 0)                // 如果上一个周期有本地事务,则意味着该节点存在写入                num_writing_members++;            }            ++it;          }        }        m_flow_control_module_info_lock->unlock();        num_writing_members = num_writing_members > 0 ? num_writing_members : 1;        // min_capacity 取 min_certifier_capacity 和 min_applier_capacity 的较小值        int64 min_capacity = (min_certifier_capacity > 0 &&                              min_certifier_capacity < min_applier_capacity)                                 ? min_certifier_capacity                                 : min_applier_capacity;        // lim_throttle 是最小配额        int64 lim_throttle = static_cast<int64>(            0.05 * std::min(get_flow_control_certifier_threshold_var(),                            get_flow_control_applier_threshold_var()));        // get_flow_control_min_recovery_quota_var() 即 group_replication_flow_control_min_recovery_quota        if (get_flow_control_min_recovery_quota_var() > 0 &&            num_non_recovering_members == 0)          lim_throttle = get_flow_control_min_recovery_quota_var();        // get_flow_control_min_quota_var() 即 group_replication_flow_control_min_quota        if (get_flow_control_min_quota_var() > 0)          lim_throttle = get_flow_control_min_quota_var();        // min_capacity 不能太?。荒艿陀?nbsp;lim_throttle        min_capacity =            std::max(std::min(min_capacity, safe_capacity), lim_throttle);        // HOLD_FACTOR 默认是 0.9        quota_size = static_cast<int64>(min_capacity * HOLD_FACTOR);        // max_quota 是由 group_replication_flow_control_max_quota 定义的,即 quota_size 不能超过 max_quota        if (max_quota > 0) quota_size = std::min(quota_size, max_quota);                // num_writing_members 是有实际写操作的节点数        if (num_writing_members > 1) {          // 如果没有设置 group_replication_flow_control_member_quota_percent , 则按照节点数平分 quota_size          if (get_flow_control_member_quota_percent_var() == 0)            quota_size /= num_writing_members;          else          // 如果有设置,则当前节点的 quota_size 等于 quota_size * group_replication_flow_control_member_quota_percent / 100            quota_size = static_cast<int64>(static_cast<double>(quota_size) *                                            TARGET_FACTOR);        }        // quota_size 还会减去上个周期超额使用的 quota        quota_size =            (quota_size - extra_quota > 1) ? quota_size - extra_quota : 1;#ifndef NDEBUG        LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_FLOW_CONTROL_STATS,                     quota_size, get_flow_control_period_var(),                     num_writing_members, num_non_recovering_members,                     min_capacity, lim_throttle);#endif      } else {        // 对应 m_holds_in_period = 0 的场景,RELEASE_FACTOR 默认是 1.5        if (quota_size > 0 && get_flow_control_release_percent_var() > 0 &&            (quota_size * RELEASE_FACTOR) < MAXTPS) {          // 当流控结束后,quota_size = 上一个周期的 quota_size * 1.5          int64 quota_size_next =              static_cast<int64>(quota_size * RELEASE_FACTOR);          quota_size =              quota_size_next > quota_size ? quota_size_next : quota_size + 1;        } else          quota_size = 0;      }      if (max_quota > 0)        // quota_size 会取 quota_size 和 max_quota 中的较小值        quota_size =            std::min(quota_size > 0 ? quota_size : max_quota, max_quota);      // 最后,将 quota_size 赋值给 m_quota_size,m_quota_used 重置为 0      m_quota_size.store(quota_size);      m_quota_used.store(0);      break;    }    // 如果 group_replication_flow_control_mode 为 DISABLED,    // 则会将 m_quota_size 和 m_quota_used 置为 0,这个时候会禁用流控 。    case FCM_DISABLED:      m_quota_size.store(0);      m_quota_used.store(0);      break;    default:      assert(0);  }  if (local_member_info->get_recovery_status() ==      Group_member_info::MEMBER_IN_RECOVERY) {    applier_module->get_pipeline_stats_member_collector()        ->compute_transactions_deltas_during_recovery();  }}

推荐阅读