Cyber-RT系列之中枢调度Scheduler

Catalogue
  1. 1. 前言
  2. 2. Cyber/Scheduler目录
  3. 3. Scheduler类图
  4. 4. 两种策略
  5. 5. 一个Scheduler实例
    1. 5.1. 实例化
  6. 6. ProcessorContext
    1. 6.1. ProcessorContext实例化
    2. 6.2. ClassicContext的静态容器
      1. 6.2.1. ClassicContext::cr_group数据结构
      2. 6.2.2. ClassicContext::rq_locks数据结构
    3. 6.3. ClassicContext的动态容器
      1. 6.3.1. ClassicContext::multi_pri_rq队列
  7. 7. 重要过程时序流图
    1. 7.1. 创建SchedulerClassic实例
    2. 7.2. 创建Processor并绑定上下文
    3. 7.3. 创建Task并分配给Processor
    4. 7.4. 运转核心Processor::Run
    5. 7.5. 唤醒机制SchedulerClassic::NotifyProcessor

前言

Scheduler是Cyber-RT的调度核心,是协程的调度载体。特别的,对于自动驾驶任务而言,任务调度的实时性发挥至关重要的作用,因此有必要对各种任务的优先级进行分类排序,如对于控制任务而言,需要单独分配CPU以供实时运行,Cyber-RT通过Scheduler来实现这种功能。

Cyber/Scheduler目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
├── BUILD
├── CMakeLists.txt
├── common
│ ├── cv_wrapper.h
│ ├── mutex_wrapper.h
│ ├── pin_thread.cc
│ ├── pin_thread.h
├── policy
│ ├── choreography_context.cc
│ ├── choreography_context.h
│ ├── classic_context.cc
│ ├── classic_context.h
│ ├── scheduler_choreography.cc
│ ├── scheduler_choreography.h
│ ├── scheduler_classic.cc
│ └── scheduler_classic.h
├── processor.cc
├── processor_context.cc
├── processor_context.h
├── processor.h
├── processor_test.cc
├── scheduler.cc
├── scheduler_factory.cc
├── scheduler_factory.h
├── scheduler.h
└── scheduler_test.cc

Scheduler类图

两种策略

通过阅读上面的类图可以发现,Scheduler类是基类,其拥有两个子类,分别为SchedulerClassicSchedulerChoreography,分别对应两种策略,Classic(经典)策略与Choreophgray(编排)策略。两者并不是互斥关系,后者可看作对前者的扩展。它们的介绍和示例可参考官方文档 Cyber RT Scheduler,这里暂不详细展开,下面的叙述以SchedulerClassic策略为主。

调度策略配置文件用protobuf定义,协议格式文件在cyber/proto目录下:scheduler_conf.protoclassic_conf.protochoreography_conf.proto。调度策略配置文件在cyber/conf目录下。对于上面mkz_close_loop.pb.txt中的两个process group:compute_sched和control_sched,根据调度策略不同分别有两个版本

一个Scheduler实例

Scheduler是个单例,因此在程序启动时就被初始化了,尽管它不是程序的入口,但是从它却是系统子模块Component调度的管理者。

实例化

Scheduler的实例化过程由scheduler_factory.cc提供,其调用函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Scheduler* Instance() {
// instance是原子模板的
Scheduler* obj = instance.load(std::memory_order_acquire);
// 如果obj为空
if (obj == nullptr) {
// 加锁
std::lock_guard<std::mutex> lock(mutex);
obj = instance.load(std::memory_order_relaxed);
// 双检查
if (obj == nullptr) {
// 默认策略
std::string policy("classic");
std::string conf("conf/");
// conf = conf/xxxxx.conf
conf.append(GlobalData::Instance()->ProcessGroup()).append(".conf");
// cfg_file = CYBER_PATH + 相对路径
auto cfg_file = GetAbsolutePath(WorkRoot(), conf);
// 检查配置文件是否存在,并读取配置到proto类型
apollo::cyber::proto::CyberConfig cfg;
if (PathExists(cfg_file) && GetProtoFromFile(cfg_file, &cfg)) {
// 从配置文件读取策略
policy = cfg.scheduler_conf().policy();
} else {
AWARN << "No sched conf found, use default conf.";
}
// 根据策略,实例化不同的调度器
if (!policy.compare("classic")) {
obj = new SchedulerClassic();
} else if (!policy.compare("choreography")) {
obj = new SchedulerChoreography();
} else {
AWARN << "Invalid scheduler policy: " << policy;
obj = new SchedulerClassic();
}
// 保存到instance单例
instance.store(obj, std::memory_order_release);
}
}
return obj;
}

实例化主要做了几个事情:

  1. 获取配置文件conf/xxxx.conf
  2. 读取策略配置项policy
  3. 根据policy配置项来选择实例化SchedulerClassic()还是SchedulerChoreography(),如果没有提供,则默认选择实例化SchedulerClassic()
  4. 这个实现确保了线程安全,即Scheduler单例只能被创建一个

SchedulerClassic()为例,接下来看其实例化过程(构造函数):

1
2
3
4
5
6
7
8
9
SchedulerClassic::SchedulerClassic(){
// 由于函数篇幅较长,下面用文字描述
1. 再次获取配置文件"conf/xxxxx.conf"
2. 解析proto配置文件
3. 读取threads字段的配置 ===> 这个暂时还不知道用来干嘛
4. 读取cpu编号配置[process_level_cpuset_],根据这个配置项来设置当前调度线程的CPU亲和性,这样就指定了该进程中的所有任务都只能在限定的CPU核上运行
5. 读取[classic_conf]配置项,遍历配置项中的每一个[group]配置,而每个[group]内又包含若干个[task],目的是把所有[task]的配置都保存下来,记录到成员变量cr_confs_内。需要注意的点是,在这个过程中,会根据每个[task]所在的[group]填充group_name,后面会用到。
6. 最后为每个group创建对应数量的Processor,并设置相关策略
}

实例化过程中,有很多部分都与配置文件直接相关,这里我们以conf/example_sched_classic.conf为例展开:

我们从第4点开始看,首先是读取配置项process_level_cpuset_,根据配置文件,这个字符串一般填"0-7,16-23"形式的内容,表示当前调度线程可以由0-7,16-23号CPU核心来执行。最终实现这个功能的函数是Scheduler::ProcessLevelResourceControl(),其内部是通过glibc提供的接口pthread_setaffinity_np(pthread_self(), sizeof(set), &set);来实现对线程设置CPU亲和性。

注意配置文件中有两个优先级:

  • 一个是processor_prio,对应系统Linux中线程的优先级,即nice值,范围从-20到19,值越低优先级越高,默认值为0;
  • 另一个是task的prio,它是Cyber RT中的协程调度的优先级,共20级,值越高优先越高

接下来第5点,简单来说就是读取每个[task]字段的配置项,并保存下来,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
classic_conf_ = cfg.scheduler_conf().classic_conf();
// 遍历每一个group
for (auto& group : classic_conf_.groups()) {
auto& group_name = group.name();
// 遍历group内的task配置项
for (auto task : group.tasks()) {
// 对task反过来设置group_name
task.set_group_name(group_name);
// 保存起来
cr_confs_[task.name()] = task;
}
}

对应配置文件举例如下:

这里有一点需要注意的是,由于配置文件中每个[task]内部没有写明其所在的[group]名称,因此在读取配置的时候,通过task.set_group_name(group_name);进行了设置,这个group_name相当的重要,接下来会用到。

最后看第6点,为每个group创建对应数量的Processor,并设置相关策略,这是由函数SchedulerClassic::CreateProcessor()来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void SchedulerClassic::CreateProcessor() {
// 遍历每一个group
for (auto& group : classic_conf_.groups()) {
1. 获取该group的group_name、proc_num配置参数
2. 获取该group的affinity、processor_policy、processor_prio配置参数
3. 内部再进行一次遍历,为每个组创建对应数量的processor,部分代码如下:

for (uint32_t i = 0; i < proc_num; i++) {
3.1 先创建执行器的上下文ClassicContext,并放入全局的pctxs_队列中
auto ctx = std::make_shared<ClassicContext>(group_name);
pctxs_.emplace_back(ctx);
3.2 创建执行器Processor,并将上面创建的上下文与之绑定
auto proc = std::make_shared<Processor>();
3.3 关键:
proc->BindContext(ctx);
3.4 设置该Processor的CPU亲和性,根据affinity = "range" or "1to1"来决定该Processor在多个CPU上进行还是单个CPU上进行
SetSchedAffinity(proc->Thread(), cpuset, affinity, i);
3.5 设置调度策略
SetSchedPolicy(proc->Thread(), processor_policy, processor_prio,
proc->Tid());
3.6 将这个新创建的Processor保存到容器
processors_.emplace_back(proc);
}
}
}

其中,通过SetSchedAffinity()函数对新创建的某个Processor对象内部的thread进行CPU亲和性设置,以实现CPU的分配,特别的,根据该groupaffinity配置项,有两种CPU分配策略:

  • range: 采用range策略,即每个Processor对象内部的thread都可以由cpuset字段给定的范围内自由调度,即范围内的CPU都可以处理
  • 1to1: 即每个Processor对象内部的thread只能对应一个CPU核心,cpuset字段提供的是有多少个CPU可分配,但是个Processor对象内部的thread只能对应cpuset字段的第i个核心

以上是关于线程的CPU亲和性设置,接下来的一个关键问题是,创建这么多Processor,如何真正处理我们的任务?

代码片段中的3.3是关键,proc->BindContext(ctx)将新创建的ProcessorContext上下文保存到成员变量Processor::context_,然后开启线程来执行Processor::Run,该函数代码不长,就两句:

1
2
3
4
5
void Processor::BindContext(const std::shared_ptr<ProcessorContext>& context) {
context_ = context;
std::call_once(thread_flag_,
[this]() { thread_ = std::thread(&Processor::Run, this); });
}

该函数首先保存传进来的ProcessorContext对象指针,然后启动了一个线程,看来Processor::Run这个线程就是实际执行任务的线程了,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void Processor::Run() {
// 暂时先忽略snap_shot_这个对象的内容

// 这是一个循环,只要Processor的running_状态weitrue
while (cyber_likely(running_.load())) {
// 检查context_不为空
if (cyber_likely(context_ != nullptr)) {
// 尝试从cr_group_获取下一个协程
auto croutine = context_->NextRoutine();
// 如果获取成功
if (croutine) {
// 恢复协程运行
croutine->Resume();
// 释放协程锁( 上面获取协程context_->NextRoutine()时,调用了croutine->Aquire() )
croutine->Release();
} else {
// 阻塞等待,超时自动解除阻塞
context_->Wait();
}
} else {
// 如果context_为空,阻塞10毫秒,超时后结束阻塞,下一次循环
std::unique_lock<std::mutex> lk(mtx_ctx_);
cv_ctx_.wait_for(lk, std::chrono::milliseconds(10));
}
}
}

Processor::Run这个线程不断的尝试从context_成员变量中获取协程,并恢复协程运行,一个协程的任务处理完后,继续从上下文获取下一个协程context_->NextRoutine();。至于这个协程的切入和切出,到后面协程篇章的时候再详细讨论。接下来,就要看看这个上下文成员变量context_是啥。

processor.h头文件内可知,成员变量context_声明如下:

1
2
// ProcessorContext只是基类,实际保存下来的是 ClassicContext 或 ChoreographyContext
std::shared_ptr<ProcessorContext> context_;

即每个Processor对象内维护着一个ProcessorContext

ProcessorContext

实际上,ProcessorContext只是基类,根据Scheduler的两种策略,分别对应着两种ProcessorContext,分别是:

  • ClassicContext
  • ChoreographyContext

uml类图如下:

接下来,以ClassicContext为主进行展开:

ProcessorContext实例化

ProcessorContext实例化不是在Processor内部进行,而是在上面提到的SchedulerClassic::CreateProcessor()函数中进行,具体代码片段为:

1
auto ctx = std::make_shared<ClassicContext>(group_name);

接下来,看ClassicContext的构造函数:

1
2
3
4
5
6
7
8
ClassicContext::ClassicContext()
{
InitGroup(DEFAULT_GROUP_NAME);
}

ClassicContext::ClassicContext(const std::string& group_name) {
InitGroup(group_name);
}

ClassicContext类有两个构造函数,其中一个需要传参[group_name],另外一个不需传参。两个构造函数内部都调用了ClassicContext::InitGroup函数,该函数从全局静态容器中取对应[group_name]的引用并保存到ClassicContext类的成员变量中,目的是,当其他地方向全局静态容器添加元素时,直接调用ClassicContext类的成员变量即可访问到新增加的元素。该函数代码如下:

1
2
3
4
5
6
7
8
9
10
void ClassicContext::InitGroup(const std::string& group_name) {
// cr_group_是ClassicContext类静态成员变量
// cr_group_包含了多个组,每个组有分为多个优先级,每个优先级对应着多个协程
multi_pri_rq_ = &cr_group_[group_name]; // 取cr_group_中对应group_name的组,保存到multi_pri_rq_
lq_ = &rq_locks_[group_name]; // 取rq_locks_中对应group_name的组,保存到lq_
mtx_wrapper_ = &mtx_wq_[group_name]; // 取mtx_wrapper_中对应group_name的组,保存到mtx_wrapper_
cw_ = &cv_wq_[group_name]; // 取cv_wq_中对应group_name的组,保存到cw_
notify_grp_[group_name] = 0;
current_grp = group_name;
}

ClassicContext的静态容器

上面提到ClassicContext类里面有几个全局静态容器,其在classic_context.h中声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Class ClassicContext{
...
public:
// CR_GROUP: 容器[组名] = std::array<std::vector<std::shared_ptr<CRoutine>>, MAX_PRIO>>
alignas(CACHELINE_SIZE) static CR_GROUP cr_group_; ///< cr_group_包含了多个组,每个组有分为多个优先级,每个优先级对应着多个协程

// RQ_LOCK_GROUP: 容器[组名] = std::array<base::AtomicRWLock, MAX_PRIO>;
alignas(CACHELINE_SIZE) static RQ_LOCK_GROUP rq_locks_; ///< rq_locks_包含多个组,每组分为多个优先级,每个优先级对应一个base::AtomicRWLock,关于cr_group_变量的锁

// GRP_WQ_CV: 容器[组名] = CvWrapper
alignas(CACHELINE_SIZE) static GRP_WQ_CV cv_wq_; ///< cv_wq_包含多个组,每组对应一个CvWrapper

// GRP_WQ_MUTEX: 容器[组名] = MutexWrapper
alignas(CACHELINE_SIZE) static GRP_WQ_MUTEX mtx_wq_; ///< mtx_wq_包含多个组,每组对应一个MutexWrapper,是关于notify_grp_[group_name]的锁

// NOTIFY_GRP: 容器[组名] = int
alignas(CACHELINE_SIZE) static NOTIFY_GRP notify_grp_; ///< notify_grp_包含多个组,每组对应一个int
}

这几个容器都是Public的且静态的,所以其他地方可以直接往里面读写数据,而线程问题则通过rq_locks_容器和mtx_wq_容器进行加锁控制。

实际上,Scheduler要调度的任务,都保存到了这几个静态容器内部,如何存进去,以及存了什么进去,SchedulerClassic::DispatchTask函数给出了答案,该函数以一个协程指针作为参数,做了以下几个工作:

  1. 首先把这个新协程放入Scheduler::id_cr_中,
  2. 然后根据[协程名]查找保存的构造Scheduler单例时,产生的cr_confs_中是否有该task对应的策略,如果有就根据策略设置该协程的[优先级]和[group_name]
  3. 最后往ClassicContext中的全局静态变量ClassicContext::cr_group_中对应该协程的[group_name]和优先级的队列中加入该协程
  4. 最后调用ClassicContext::Notify(来通知该协程所属的组,让Processor::Run()结束阻塞,马上运行一次

这个函数,着重看以下代码块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bool SchedulerClassic::DispatchTask(const std::shared_ptr<CRoutine>& cr) {

...

// 根据协程名进行查表,cr_confs_在实例化SchedulerClassic对象时,就根据配置文件读取配置进去了,对应的是sched_classic.conf配置文件中的"tasks:"项
if (cr_confs_.find(cr->name()) != cr_confs_.end()) {
// 找到配置项,则取对应的value
ClassicTask task = cr_confs_[cr->name()];
// 协程设置属性(从ClassicTask获取)
cr->set_priority(task.prio()); // 设置优先级?
cr->set_group_name(task.group_name()); // 设置分组名? 如果没有给这个值呢? ==> 在SchedulerClassic::SchedulerClassic()构造时,会设置值的
} else {
// 如果cr_confs_没有这个协程名的条目,直接设置分组名为默认的 classic_conf_.groups(0).name()
// croutine that not exist in conf
cr->set_group_name(classic_conf_.groups(0).name());
}

// Enqueue task.
{
// 取ClassicContext::cr_group_对应该组该优先级的写锁
WriteLockGuard<AtomicRWLock> lk(
ClassicContext::rq_locks_[cr->group_name()].at(cr->priority()));
// 将输入参数中的协程添加到ClassicContext::cr_group_,等待被调度
ClassicContext::cr_group_[cr->group_name()]
.at(cr->priority())
.emplace_back(cr);
}
...
}

所以说,需要调度的协程,都通过这个函数,根据协程所在的组,把协程指针保存到ClassicContext::cr_group_静态变量中。

ClassicContext::cr_group数据结构

ClassicContext::cr_group_是一张大的表格,分为多个[group],每个[group]又分多个优先级,每个优先级对应着一个std::vectorvector内部存放着多个协程。

一个需要注意的点是,由于ClassicContext::cr_group_是静态变量,多个线程访问时会有data race的问题,因此cyber-rt增加了对应的锁ClassicContext::rq_locks_来解决。

ClassicContext::rq_locks数据结构

显然,ClassicContext::rq_locks_内按group_name进行分组,每个组内又按优先级进行划分,因此,这里一个锁对应ClassicContext::cr_group_中的一个std::vector<CRoutine>,其对应关系如下图所示:

ClassicContext的动态容器

前面讨论了ClassicContext的静态成员变量,接下来我们看其类内动态成员变量。代码片段如下:

1
2
3
4
5
6
7
8
9
10
class ClassicContext : public ProcessorContext {
...
private:
// std::array<std::vector<std::shared_ptr<CRoutine>>, MAX_PRIO>
MULTI_PRIO_QUEUE *multi_pri_rq_ = nullptr; ///< 优先队列,每个优先级有一个协程列表
LOCK_QUEUE *lq_ = nullptr;
MutexWrapper *mtx_wrapper_ = nullptr; ///< 这个是对成员notify_grp_[current_grp]的锁
CvWrapper *cw_ = nullptr;
std::string current_grp;
}

ClassicContext::multi_pri_rq队列

在函数ClassicContext::InitGroup中,将ClassicContext::cr_group_的某个组的引用赋值给了成员变量ClassicContext::multi_pri_rq_,因此,ClassicContext::multi_pri_rq_的数据结构如下:

ClassicContext::multi_pri_rq_ClassicContext::cr_group_的映射关系如下:

所以,通过向全局静态表ClassicContext::cr_group_写入协程后,可以通过ClassicContext::multi_pri_rq_来读取对应的协程,最后通过Processor::Run来完成协程的调用。

重要过程时序流图

创建SchedulerClassic实例

创建Processor并绑定上下文

创建Task并分配给Processor

Scheduler::CreateTask这个函数是Scheduler基类的函数,其内部会调用子类SchedulerClassicSchedulerChoreographyDispatchTask()函数,最后将子类的NotifyProcessor函数注册给DataVistor对象,其详细的时序流程如下:

运转核心Processor::Run

唤醒机制SchedulerClassic::NotifyProcessor