Linux中断处理的“下半部”机制

2023-05-16

前言

中断分为硬件中断,软件中断。中断的处理原则主要有两个:一个是不能嵌套,另外一个是越快越好。在Linux中,分为中断处理采用“上半部”和“下半部”处理机制。

一、中断处理“下半部”机制

中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时会到来,如果关中断的时间太长,CPU就不能及时响应其他的中断请求,从而造成中断的丢失。

因此,Linux内核的目标就是尽可能快的处理完中断请求,尽其所能把更多的处理向后推迟。例如,假设一个数据块已经达到了网线,当中断控制器接受到这个中断请求信号时,Linux内核只是简单地标志数据到来了,然后让处理器恢复到它以前运行的状态,其余的处理稍后再进行(如把数据移入一个缓冲区,接受数据的进程就可以在缓冲区找到数据)。

因此,内核把中断处理分为两部分:上半部(top-half)和下半部(bottom-half),上半部 (就是中断服务程序)内核立即执行,而下半部(就是一些内核函数)留着稍后处理。

首先:一个快速的“上半部”来处理硬件发出的请求,它必须在一个新的中断产生之前终止。通常,除了在设备和一些内存缓冲区(如果你的设备用到了DMA,就不止这些)之间移动或传送数据,确定硬件是否处于健全的状态之外,这一部分做的工作很少。
第二:“下半部”运行时是允许中断请求的,而上半部运行时是关中断的,这是二者之间的主要区别。

内核到底什么时候执行下半部,以何种方式组织下半部?

这就是我们要讨论的下半部实现机制,这种机制在内核的演变过程中不断得到改进,在以前的内核中,这个机制叫做bottom-half(以下简称BH)。但是,Linux的这种bottom-half机制有两个缺点:

  • 在任意一时刻,系统只能有一个CPU可以执行BH代码,以防止两个或多个CPU同时来执行BH函数而相互干扰。因此BH代码的执行是严格“串行化”的。
  • BH函数不允许嵌套。

这两个缺点在单CPU系统中是无关紧要的,但在SMP系统中却是非常致命的。因为BH机制的严格串行化执行显然没有充分利用SMP系统的多CPU特点。为此,在2.4以后的版本中有了新的发展和改进,改进的目标使下半部可以在多处理机上并行执行,并有助于驱动程序的开发者进行驱动程序的开发。下面主要介绍3种5.4内核中的“下半部”处理机制:

  • 软中断请求(softirq)机制
  • 小任务(tasklet)机制
  • 工作队列机制
  • threaded_irq机制

以上三种机制的比较如下图所示

img

二、软中断请求(softirq)机制

Linux的softirq机制是与SMP紧密不可分的。为此,整个softirq机制的设计与实现中自始自终都贯彻了一个思想:“谁触发,谁执行”(Who marks,Who runs),也即触发软中断的那个CPU负责执行它所触发的软中断,而且每个CPU都有它自己的软中断触发与控制机制。这个设计思想也使得softirq机制充分利用了SMP系统的性能和特点。

2.1 软中断描述符

Linux在include/linux/interrupt.h头文件中定义了数据结构softirq_action,来描述一个软中断请求,如下所示:

/* PLEASE, avoid to allocate new softirqs, if you need not _really_ high
   frequency threaded job scheduling. For almost all the purposes
   tasklets are more than enough. F.e. all serial device BHs et
   al. should be converted to tasklets, not to softirqs.
 */

enum
{
	HI_SOFTIRQ=0,	//用于实现高优先级的软中断
	TIMER_SOFTIRQ,
	NET_TX_SOFTIRQ,	//用于实现网络数据的发送
	NET_RX_SOFTIRQ, //用于实现网络数据的接收
	BLOCK_SOFTIRQ,
	IRQ_POLL_SOFTIRQ,
	TASKLET_SOFTIRQ, //用于实现tasklet软中断
	SCHED_SOFTIRQ,
	HRTIMER_SOFTIRQ, /* Unused, but kept as tools rely on the
			    numbering. Sigh! */
	RCU_SOFTIRQ,    /* Preferable RCU should always be the last softirq */

	NR_SOFTIRQS		//大小等于10,最后一个
};

/* map softirq index to softirq name. update 'softirq_to_name' in
 * kernel/softirq.c when adding a new softirq.
 */
extern const char * const softirq_to_name[NR_SOFTIRQS];

/* softirq mask and active fields moved to irq_cpustat_t in
 * asm/hardirq.h to get better cache usage.  KAO
 */

struct softirq_action
{
	void	(*action)(struct softirq_action *);	//函数指针action指向软中断请求的服务函数
};

asmlinkage void do_softirq(void);
asmlinkage void __do_softirq(void);

其中,函数指针action指向软中断请求的服务函数。基于上述软中断描述符,Linux在kernel/softirq.c文件中定义了一个全局的softirq_vec数组:

static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;

在这里系统一共定义了10个软中断请求描述符。软中断向量i(0≤i≤9)所对应的软中断请求描述符就是softirq_vec[i]。这个数组是个系统全局数组,即它被所有的CPU所共享。这里需要注意的一点是:每个CPU虽然都有它自己的触发和控制机制,并且只执行他自己所触发的软中断请求,但是各个CPU所执行的软中断服务例程却是相同的,也即都是执行softirq_vec[ ]数组中定义的软中断服务函数。Linux在kernel/softirq.c中的相关代码如下:

/*
   - No shared variables, all the data are CPU local.
   - If a softirq needs serialization, let it serialize itself
     by its own spinlocks.
   - Even if softirq is serialized, only local cpu is marked for
     execution. Hence, we get something sort of weak cpu binding.
     Though it is still not clear, will it result in better locality
     or will not.

   Examples:
   - NET RX softirq. It is multithreaded and does not require
     any global serialization.
   - NET TX softirq. It kicks software netdevice queues, hence
     it is logically serialized per device, but this serialization
     is invisible to common code.
   - Tasklets: serialized wrt itself.
 */

#ifndef __ARCH_IRQ_STAT
DEFINE_PER_CPU_ALIGNED(irq_cpustat_t, irq_stat);	//较老版本内核irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;
EXPORT_PER_CPU_SYMBOL(irq_stat);	//较老版本内核EXPORT_SYMBOL(irq_stat)
#endif

static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;

DEFINE_PER_CPU(struct task_struct *, ksoftirqd);

const char * const softirq_to_name[NR_SOFTIRQS] = {
	"HI", "TIMER", "NET_TX", "NET_RX", "BLOCK", "IRQ_POLL",
	"TASKLET", "SCHED", "HRTIMER", "RCU"
};

2.2 软中断触发机制

要实现“谁触发,谁执行”的思想,就必须为每个CPU都定义它自己的触发和控制变量。为此,Linux在\Linux-5.4\arch\arm\include\asm\hardirq.h头文件中定义了数据结构irq_cpustat_t来描述一个CPU的中断统计信息,其中就有用于触发和控制软中断的成员变量。数据结构irq_cpustat_t的定义如下:

IPI: 处理器间的中断(Inter-Processor Interrupts)

#define NR_IPI	7		//Inter-Processor Interrupts,IPI

typedef struct {
	unsigned int __softirq_pending;
#ifdef CONFIG_SMP
	unsigned int ipi_irqs[NR_IPI];
#endif
} ____cacheline_aligned irq_cpustat_t;

#define __inc_irq_stat(cpu, member)	__IRQ_STAT(cpu, member)++
#define __get_irq_stat(cpu, member)	__IRQ_STAT(cpu, member)

#define __IRQ_STAT(cpu, member) (irq_stat[cpu].member)
//\Linux-5.4\include\linux\irq_cpustat.h文件中
DECLARE_PER_CPU_ALIGNED(irq_cpustat_t, irq_stat);	/* defined in asm/hardirq.h */
#define __IRQ_STAT(cpu, member)	(per_cpu(irq_stat.member, cpu))

//\Linux-5.4\include\linux\percpu-defs.h文字中
/*
 * Normal declaration and definition macros.
 */
#define DECLARE_PER_CPU_SECTION(type, name, sec)			\
	extern __PCPU_ATTRS(sec) __typeof__(type) name

#define DECLARE_PER_CPU_ALIGNED(type, name)				\
	DECLARE_PER_CPU_SECTION(type, name, PER_CPU_ALIGNED_SECTION)	\
	____cacheline_aligned

展开可得到以下定义:

 irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;

其中:

  1. NR_CPUS,为系统中CPU个数。
  2. 这样,每个CPU都只操作它自己的中断统计信息结构。假设有一个编号为id的CPU,那么它只能操作它自己的中断统计信息结构irq_stat[id](0≤id≤NR_CPUS-1),从而使各CPU之间互不影响。

使用open_softirq()函数可以注册软中断对应的处理函数,而raise_softirq()函数可以出发一个软件中断。

//使用softirq机制需要通过open_softirq来注册软中断处理函数,使中断索引号与中断处理函数对应。该函数定义在kernel/softirq.c文件中。
//该函数将软中断的中断处理函数指针赋值给相应的softirq_vec。
//nr为中断号,action为中断处理函数
void open_softirq(int nr, void (*action)(struct softirq_action *))
{
	softirq_vec[nr].action = action;
}
//在中断处理函数完成了紧急的硬件操作后,就应该调用raise_softirq函数来触发软中断,让软中断来处理耗时的中断下半部操作。
//如果没有处于中断中,则立马唤醒线程softirqd
void raise_softirq(unsigned int nr)		//触发一个软中断,nr为中断号
{
	unsigned long flags;

	local_irq_save(flags);
	raise_softirq_irqoff(nr);	//raise_softirq_irqoff来触发相应的软中断,将相应的bit位置位
	local_irq_restore(flags);
}

raise_softirq_irqoff函数:

/*
 * This function must run with irqs disabled!
 */
inline void raise_softirq_irqoff(unsigned int nr)
{
	__raise_softirq_irqoff(nr);

	/*
	 * If we're in an interrupt or softirq, we're done
	 * (this also catches softirq-disabled code). We will
	 * actually run the softirq once we return from
	 * the irq or softirq.
	 *
	 * Otherwise we wake up ksoftirqd to make sure we
	 * schedule the softirq soon.
	 */
	if (!in_interrupt())
		wakeup_softirqd();
}

通过in_interrupt判断现在是否在中断上下文中,或者软中断是否被禁止,如果都不成立,我们必须要调用wakeup_softirqd函数用来唤醒本CPU上的softirqd这个内核线程。

2.3 初始化软中断

//初始化软中断(softirq_init)
//在start_kernel()进行系统初始化中,就调用了softirq_init()函数对HI_SOFTIRQ和TASKLET_SOFTIRQ两个软中断进行了初始化
//Linux-5.4\init\main.c -> softirq_init()
void __init softirq_init(void)
{
	int cpu;

	for_each_possible_cpu(cpu) {
		per_cpu(tasklet_vec, cpu).tail =
			&per_cpu(tasklet_vec, cpu).head;
		per_cpu(tasklet_hi_vec, cpu).tail =
			&per_cpu(tasklet_hi_vec, cpu).head;
	}

	open_softirq(TASKLET_SOFTIRQ, tasklet_action);	//设置软中断服务函数
	open_softirq(HI_SOFTIRQ, tasklet_hi_action);	//设置软中断服务函数
}

2.4 软中断服务的执行及处理

2.4.1 在中断返回现场调度__do_softirq

在中断处理程序中触发软中断是最常见的形式,一个硬件中断处理完成之后。下面的函数在处理完硬件中断之后退出中断处理函数,在irq_exit中会触发软件中断的处理。

中断处理模型:

这里写图片描述

硬中断处理过程:

fastcall unsigned int do_IRQ(struct pt_regs *regs)
{
        ...
        irq_enter();

        //handle external interrupt (ISR)
        ...
          irq_exit();

        return 1;
}

硬中断处理完毕后会执行irq_exit()函数

/*
 * Exit an interrupt context. Process softirqs if needed and possible:
 */
void irq_exit(void)
{
#ifndef __ARCH_IRQ_EXIT_IRQS_DISABLED
	local_irq_disable();
#else
	lockdep_assert_irqs_disabled();
#endif
	account_irq_exit_time(current);
	preempt_count_sub(HARDIRQ_OFFSET);
	if (!in_interrupt() && local_softirq_pending())	//如果没有处于中断上下文中,以及没有标志位,则调用invoke_softirq函数,调用__do_softirq函数,或者唤醒处理软中断的线程。
		invoke_softirq();

	tick_irq_exit();
	rcu_irq_exit();
	trace_hardirq_exit(); /* must be last! */
}

在irq_exit()函数中会调用invoke_softirq函数

static inline void invoke_softirq(void)
{
	if (ksoftirqd_running(local_softirq_pending()))
		return;

	if (!force_irqthreads) {
#ifdef CONFIG_HAVE_IRQ_EXIT_ON_IRQ_STACK
		/*
		 * We can safely execute softirq on the current stack if
		 * it is the irq stack, because it should be near empty
		 * at this stage.
		 */
		__do_softirq();	/* 软中断处理函数 */
#else
		/*
		 * Otherwise, irq_exit() is called on the task stack that can
		 * be potentially deep already. So call softirq in its own stack
		 * to prevent from any overrun.
		 */
		do_softirq_own_stack();	//__do_softirq();
#endif
	} else {
		wakeup_softirqd();/* 如果强制使用软中断线程进行软中断处理,会通知调度器唤醒软中断线程ksoftirqd */
	}
}

函数do_softirq()负责执行数组softirq_vec[i]中设置的软中断服务函数。每个CPU都是通过执行这个函数来执行软中断服务的。由于同一个CPU上的软中断服务例程不允许嵌套,因此,do_softirq()函数一开始就检查当前CPU是否已经正出在中断服务中,如果是则do_softirq()函数立即返回。举个例子,假设CPU0正在执行do_softirq()函数,执行过程产生了一个高优先级的硬件中断,于是CPU0转去执行这个高优先级中断所对应的中断服务程序。众所周知,所有的中断服务程序最后都要跳转到do_IRQ()函数并由它来依次执行中断服务队列中的ISR,这里我们假定这个高优先级中断的ISR请求触发了一次软中断,于是do_IRQ()函数在退出之前看到有软中断请求,从而调用do_softirq()函数来服务软中断请求。因此,CPU0再次进入do_softirq()函数(也即do_softirq()函数在CPU0上被重入了)。但是在这一次进入do_softirq()函数时,它马上发现CPU0此前已经处在中断服务状态中了,因此这一次do_softirq()函数立即返回。于是,CPU0回到该开始时的do_softirq()函数继续执行,并为高优先级中断的ISR所触发的软中断请求补上一次服务。从这里可以看出,do_softirq()函数在同一个CPU上的执行是串行的。

do_softirq函数:

asmlinkage __visible void do_softirq(void)
{
	__u32 pending;
	unsigned long flags;
	/* 判断是否在中断处理中,如果正在中断处理,就直接返回 */
	if (in_interrupt())
		return;
	/* 保存当前寄存器的值 */
	local_irq_save(flags);
	/* 取得当前已注册软中断的位图 */
	pending = local_softirq_pending();

    /* 循环处理所有已注册的软中断 */
	if (pending && !ksoftirqd_running(pending))
		do_softirq_own_stack();		//__do_softirq

	local_irq_restore(flags);
}

__do_softirq函数:

asmlinkage __visible void __softirq_entry __do_softirq(void)
{
	unsigned long end = jiffies + MAX_SOFTIRQ_TIME;	/* 为了防止软中断执行时间太长,设置了一个软中断结束时间 */
	unsigned long old_flags = current->flags;		 /* 保存当前进程的标志 */
	int max_restart = MAX_SOFTIRQ_RESTART;			/* 软中断循环执行次数: 10次 */
	struct softirq_action *h;						/* 软中断的action指针 */
	bool in_hardirq; 
	__u32 pending;
	int softirq_bit;

	/*
	 * Mask out PF_MEMALLOC as the current task context is borrowed for the
	 * softirq. A softirq handled, such as network RX, might set PF_MEMALLOC
	 * again if the socket is related to swapping.
	 */
	current->flags &= ~PF_MEMALLOC;

	pending = local_softirq_pending();			/* 获取此CPU的__softirq_pengding变量值 */
	account_irq_enter_time(current);			/* 用于统计进程被软中断使用时间 */

	__local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET);	/* 增加preempt_count软中断计数器,也表明禁止了调度 */
	in_hardirq = lockdep_softirq_start();

restart:										/* 循环10次的入口,每次循环都会把所有挂起需要执行的软中断执行一遍 */
	/* 该CPU的__softirq_pending清零,当前的__softirq_pending保存在pending变量中 */
    /* 这样做就保证了新的软中断会在下次循环中执行 */
	
	/* Reset the pending bitmask before enabling irqs */
	set_softirq_pending(0);
	
	local_irq_enable();		/* 开中断 */

	h = softirq_vec;		/* h指向软中断数组头 */

	while ((softirq_bit = ffs(pending))) {/* 每次获取最高优先级的已挂起软中断 */
		unsigned int vec_nr;
		int prev_count;

		h += softirq_bit - 1;		/* 获取此软中断描述符地址 */

		vec_nr = h - softirq_vec;	/* 减去软中断描述符数组首地址,获得软中断号 */
		prev_count = preempt_count();	/* 获取preempt_count的值 */

		kstat_incr_softirqs_this_cpu(vec_nr);	/* 增加统计中该软中断发生次数 */

		trace_softirq_entry(vec_nr);
		h->action(h);							/* 执行该软中断的action操作 */
		trace_softirq_exit(vec_nr);
		/* 之前保存的preempt_count并不等于当前的preempt_count的情况处理,也是简单的把之前的复制到当前的preempt_count上,这样做是防止最后软中断计数不为0导致系统不能够执行调度 */
		if (unlikely(prev_count != preempt_count())) {
			pr_err("huh, entered softirq %u %s %p with preempt_count %08x, exited with %08x?\n",
			       vec_nr, softirq_to_name[vec_nr], h->action,
			       prev_count, preempt_count());
			preempt_count_set(prev_count);
		}
		h++;/* h指向下一个软中断,但下个软中断并不一定需要执行,这里只是配合softirq_bit做到一个处理 */
		pending >>= softirq_bit;
	}

	if (__this_cpu_read(ksoftirqd) == current)
		rcu_softirq_qs();
	local_irq_disable();		/* 关中断 */
	/* 循环结束后再次获取CPU的__softirq_pending变量,为了检查是否还有软中断未执行 */
	pending = local_softirq_pending();
	/* 在还有软中断需要执行的情况下,如果时间片没有执行完,并且循环次数也没到10次,继续执行软中断 */
	if (pending) {
		if (time_before(jiffies, end) && !need_resched() &&
		    --max_restart)
			goto restart;
		/* 这里是有软中断挂起,但是软中断时间和循环次数已经用完,通知调度器唤醒软中断线程去执行挂起的软中断,软中断线程是ksoftirqd,这里只起到一个通知作用,因为在中断上下文中是禁止调度的 */
		wakeup_softirqd();
	}

	lockdep_softirq_end(in_hardirq);
	account_irq_exit_time(current);/* 用于统计进程被软中断使用时间 */
	/* 减少preempt_count中的软中断计数器 */
	__local_bh_enable(SOFTIRQ_OFFSET);
	WARN_ON_ONCE(in_interrupt());
	/* 还原进程标志 */
	current_restore_flags(old_flags, PF_MEMALLOC);
}

2.4.2 在守护线程ksoftirq中执行

虽然大部分的softirq是在中断退出的情况下执行,但是有几种情况会在ksoftirq中执行。

  1. 从上文看出,raise_softirq主动触发,而此时正好不是在中断上下文中,ksoftirq进程将被唤醒。
  2. 在irq_exit中执行软中断,但是在经过MAX_SOFTIRQ_RESTART次循环后,软中断还未处理完,这种情况,ksoftirq进程也会被唤醒。

所以加入守护线程这一机制,主要是担心一旦有大量的软中断等待执行,会使得内核过长地留在中断上下文中。

static void wakeup_softirqd(void)
{
	/* Interrupts are disabled: no need to stop preemption */
	struct task_struct *tsk = __this_cpu_read(ksoftirqd);//ksoftirqd这个线程

	if (tsk && tsk->state != TASK_RUNNING)
		wake_up_process(tsk);
}

softirq_threads线程函数为run_ksoftirqd:

//\kernel\softirq.c文件中
static struct smp_hotplug_thread softirq_threads = {
	.store			= &ksoftirqd,
	.thread_should_run	= ksoftirqd_should_run,
	.thread_fn		= run_ksoftirqd,
	.thread_comm		= "ksoftirqd/%u",
};

run_ksoftirqd函数:

static void run_ksoftirqd(unsigned int cpu)
{
	local_irq_disable();
	if (local_softirq_pending()) {
		/*
		 * We can safely run softirq on inline stack, as we are not deep
		 * in the task stack here.
		 */
		__do_softirq();		//最终还是调用这个函数。
		local_irq_enable();
		cond_resched();
		return;
	}
	local_irq_enable();
}

三、小任务机制

tasklet机制是一种较为特殊的软中断。

tasklet一词的原意是“小片任务”的意思,这里是指一小段可执行的代码,且通常以函数的形式出现。软中断向量HI_SOFTIRQ和TASKLET_SOFTIRQ均是用tasklet机制来实现的。

从某种程度上讲,tasklet机制是Linux内核对BH机制的一种扩展。在2.4内核引入了softirq机制后,原有的BH机制正是通过tasklet机制这个桥梁来将softirq机制纳入整体框架中的。正是由于这种历史的延伸关系,使得tasklet机制与一般意义上的软中断有所不同,而呈现出以下两个显著的特点:

  1. 与一般的软中断不同,某一段tasklet代码在某个时刻只能在一个CPU上运行,而不像一般的软中断服务函数(即softirq_action结构中的action函数指针)那样——在同一时刻可以被多个CPU并发地执行。
  2. 与BH机制不同,不同的tasklet代码在同一时刻可以在多个CPU上并发地执行,而不像BH机制那样必须严格地串行化执行(也即在同一时刻系统中只能有一个CPU执行BH函数)。

3.1 tasklet描述符

Linux用数据结构tasklet_struct来描述一个tasklet,每个结构代表一个独立的小任务。该数据结构定义在include/linux/interrupt.h头文件中。如下所示:

/* Tasklets --- multithreaded analogue of BHs.

   Main feature differing them of generic softirqs: tasklet
   is running only on one CPU simultaneously.

   Main feature differing them of BHs: different tasklets
   may be run simultaneously on different CPUs.

   Properties:
   * If tasklet_schedule() is called, then tasklet is guaranteed
     to be executed on some cpu at least once after this.
   * If the tasklet is already scheduled, but its execution is still not
     started, it will be executed only once.
   * If this tasklet is already running on another CPU (or schedule is called
     from tasklet itself), it is rescheduled for later.
   * Tasklet is strictly serialized wrt itself, but not
     wrt another tasklets. If client needs some intertask synchronization,
     he makes it with spinlocks.
 */

struct tasklet_struct
{
	struct tasklet_struct *next;
	unsigned long state;
	atomic_t count;
	void (*func)(unsigned long);
	unsigned long data;
};

其中:

  • next: 指向下一个tasklet的指针;

  • state: 定义了这个tasklet的当前状态。这一个32位的无符号长整数,当前只使用了bit[1]和bit[0]两个状态位。其中,bit[1]=1 表示这个tasklet当前正在某个CPU上被执行,它仅对SMP系统才有意义,其作用就是为了防止多个CPU同时执行一个tasklet的情形出现;bit[0]=1表示这个tasklet已经被调度去等待执行了。

对这两个状态位的宏定义如下所示(interrupt.h):

enum
{
	TASKLET_STATE_SCHED,	/* Tasklet is scheduled for execution */
	TASKLET_STATE_RUN	/* Tasklet is running (SMP only) */
};
  • count: 子计数count,对这个tasklet的引用计数值。

注:只有当count等于0时,tasklet代码段才能执行,也即此时tasklet是被使能的;如果count非零,则这个tasklet是被禁止的。任何想要执行一个tasklet代码段的人都首先必须先检查其count成员是否为0。

  • func:指向以函数形式表现的可执行tasklet代码段。
  • data:函数func的参数。这是一个32位的无符号整数,其具体含义可供func函数自行解释,比如将其解释成一个指向某个用户自定义数据结构的地址值。

Linux在interrupt.h头文件中又定义了两个用来定义tasklet_struct结构变量的辅助宏:

//定义一个tasklet_struct结构体
#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }

#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }

显然,从上述源代码可以看出,用DECLARE_TASKLET宏定义的tasklet在初始化时是被使能的(enabled),因为其count成员为0。而用DECLARE_TASKLET_DISABLED宏定义的tasklet在初始时是被禁止的(disabled),因为其count等于1。

3.2 改变一个tasklet状态的操作

在这里,tasklet状态指两个方面:

  1. state:成员所表示的运行状态;
  2. count:成员决定的使能/禁止状态。

3.2.1 改变一个tasklet的运行状态

state成员中的bit[0]表示一个tasklet是否已被调度去等待执行,bit[1]表示一个tasklet是否正在某个CPU上执行。对于state变量中某位的改变必须是一个原子操作,因此可以用定义在include/asm/bitops.h头文件中的位操作来进行。
  由于bit[1]这一位(即TASKLET_STATE_RUN)仅仅对于SMP系统才有意义,因此Linux在Interrupt.h头文件中显示地定义了对TASKLET_STATE_RUN位的操作。如下所示:

#ifdef CONFIG_SMP
static inline int tasklet_trylock(struct tasklet_struct *t)
{
	return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}
 
static inline void tasklet_unlock(struct tasklet_struct *t)
{
	smp_mb__before_clear_bit(); 
	clear_bit(TASKLET_STATE_RUN, &(t)->state);
}
 
static inline void tasklet_unlock_wait(struct tasklet_struct *t)
{
	while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
}
#else
#define tasklet_trylock(t) 1
#define tasklet_unlock_wait(t) do { } while (0)
#define tasklet_unlock(t) do { } while (0)
#endif

显然,在SMP系统同,tasklet_trylock()宏将把一个tasklet_struct结构变量中的state成员中的bit[1]位设置成1,同时还返回bit[1]位的非。因此,如果bit[1]位原有值为1(表示另外一个CPU正在执行这个tasklet代码),那么tasklet_trylock()宏将返回值0,也就表示上锁不成功。如果bit[1]位的原有值为0,那么tasklet_trylock()宏将返回值1,表示加锁成功。而在单CPU系统中,tasklet_trylock()宏总是返回为1。
任何想要执行某个tasklet代码的程序都必须首先调用宏tasklet_trylock()来试图对这个tasklet进行上锁(即设置TASKLET_STATE_RUN位),且只能在上锁成功的情况下才能执行这个tasklet。建议!即使你的程序只在CPU系统上运行,你也要在执行tasklet之前调用tasklet_trylock()宏,以便使你的代码获得良好可移植性。
  在SMP系统中,tasklet_unlock_wait()宏将一直不停地测试TASKLET_STATE_RUN位的值,直到该位的值变为0(即一直等待到解锁),假如:CPU0正在执行tasklet A的代码,在此期间,CPU1也想执行tasklet A的代码,但CPU1发现tasklet A的TASKLET_STATE_RUN位为1,于是它就可以通过tasklet_unlock_wait()宏等待tasklet A被解锁(也即TASKLET_STATE_RUN位被清零)。在单CPU系统中,这是一个空操作。
  宏tasklet_unlock()用来对一个tasklet进行解锁操作,也即将TASKLET_STATE_RUN位清零。在单CPU系统中,这是一个空操作。

3.2.2 使能/禁止一个tasklet

使能与禁止操作往往总是成对地被调用的,tasklet_disable()函数如下(interrupt.h):

static inline void tasklet_disable(struct tasklet_struct *t)
{
	tasklet_disable_nosync(t);
	tasklet_unlock_wait(t);
	smp_mb();
}

函数tasklet_disable_nosync()也是一个静态inline函数,它简单地通过原子操作将count成员变量的值加1。如下所示(interrupt.h):

static inline void tasklet_disable_nosync(struct tasklet_struct *t)
{
	atomic_inc(&t->count);
	smp_mb__after_atomic_inc();
}

函数tasklet_enable()用于使能一个tasklet,如下所示(interrupt.h):

static inline void tasklet_enable(struct tasklet_struct *t)
{
	smp_mb__before_atomic_dec();
	atomic_dec(&t->count);
}

3.3 tasklet描述符的初始化与杀死

函数tasklet_init()用来初始化一个指定的tasklet描述符,其源码如下所示(kernel/softirq.c):

void tasklet_init(struct tasklet_struct *t,
		  void (*func)(unsigned long), unsigned long data)
{
	t->next = NULL;
	t->state = 0;
	atomic_set(&t->count, 0);
	t->func = func;
	t->data = data;
}

函数tasklet_kill()用来将一个已经被调度了的tasklet杀死,即将其恢复到未调度的状态。其源码如下所示(kernel/softirq.c):

void tasklet_kill(struct tasklet_struct *t)
{
	if (in_interrupt())
		pr_notice("Attempt to kill tasklet from interrupt\n");

	while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
		do {
			yield();
		} while (test_bit(TASKLET_STATE_SCHED, &t->state));
	}
	tasklet_unlock_wait(t);
	clear_bit(TASKLET_STATE_SCHED, &t->state);
}

3.4 tasklet对列

多个tasklet可以通过tasklet描述符中的next成员指针链接成一个单向对列。为此,Linux专门在头文件include/linux/interrupt.h中定义了数据结构tasklet_head来描述一个tasklet对列的头部指针。如下所示:

//\Linux-5.4\kernel\softirq.c文件中
/*
 * Tasklets
 */
struct tasklet_head
{
	struct tasklet_struct *head;
	struct tasklet_struct **tail;
};

尽管tasklet机制是特定于软中断向量HI_SOFTIRQ和TASKLET_SOFTIRQ的一种实现,但是tasklet机制仍然属于softirq机制的整体框架范围内的,因此,它的设计与实现仍然必须坚持“谁触发,谁执行”的思想。为此,Linux为系统中的每一个CPU都定义了一个tasklet对列头部,来表示应该有各个CPU负责执行的tasklet对列。如下所示(kernel/softirq.c):

//\Linux-5.4\include\linux\percpu-defs.h
/*
 * Variant on the per-CPU variable declaration/definition theme used for
 * ordinary per-CPU variables.
 */
#define DECLARE_PER_CPU(type, name)					\
	DECLARE_PER_CPU_SECTION(type, name, "")

#define DEFINE_PER_CPU(type, name)					\
	DEFINE_PER_CPU_SECTION(type, name, "")
//(kernel/softirq.c)
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);

上面展开,可得:

struct tasklet_head tasklet_vec[NR_CPUS] __cacheline_aligned;
struct tasklet_head tasklet_hi_vec[NR_CPUS] __cacheline_aligned;

其中,tasklet_vec[]数组用于软中断向量TASKLET_SOFTIRQ,而tasklet_hi_vec[]数组则用于软中断向量HI_SOFTIRQ。也即,如果CPUi(0≤i≤NR_CPUS-1)触发了软中断向量TASKLET_SOFTIRQ,那么对列tasklet_vec[i]中的每一个tasklet都将在CPUi服务于软中断向量TASKLET_SOFTIRQ时被CPUi所执行。同样地,如果CPUi(0≤i≤NR_CPUS-1)触发了软中断向量HI_SOFTIRQ,那么队列tasklet_hi_vec[i]中的每一个tasklet都将CPUi在对软中断向量HI_SOFTIRQ进行服务时被CPUi所执行。
  队列tasklet_vec[I]和tasklet_hi_vec[I]中的各个tasklet是怎样被所CPUi所执行的呢?其关键就是软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ的软中断服务程序——tasklet_action()函数和tasklet_hi_action()函数。下面我们就来分析这两个函数。

3.5 软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ

Linux为软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ实现了专用的触发函数和软中断服务函数。

  • 专用的触发函数

tasklet_schedule()函数和tasklet_hi_schedule()函数分别用来在当前CPU上触发软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ,并把指定的tasklet加入当前CPU所对应的tasklet队列中去等待执行。

  • 专用的软中断服务函数

tasklet_action()函数和tasklet_hi_action()函数则分别是软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ的软中断服务函数。在初始化函数softirq_init()中,这两个软中断向量对应的描述符softirq_vec[0]和softirq_vec[6]中的action函数指针就被分别初始化成指向函数tasklet_hi_action()和函数tasklet_action()。

3.5.1软中断向量TASKLET_SOFTIRQ的触发函数tasklet_schedule

该函数实现在include/linux/interrupt.h头文件中,是一个inline函数。其源码如下所示:

//Linux-5.4\include\linux\interrupt.h
static inline void tasklet_schedule(struct tasklet_struct *t)
{
	if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
		__tasklet_schedule(t);
}
//\Linux-5.4\kernel\softirq.c
void __tasklet_schedule(struct tasklet_struct *t)
{
	__tasklet_schedule_common(t, &tasklet_vec,
				  TASKLET_SOFTIRQ);
}
//\Linux-5.4\kernel\softirq.c
static void __tasklet_schedule_common(struct tasklet_struct *t,
				      struct tasklet_head __percpu *headp,
				      unsigned int softirq_nr)
{
	struct tasklet_head *head;
	unsigned long flags;

	local_irq_save(flags);
	head = this_cpu_ptr(headp);
	t->next = NULL;
	*head->tail = t;
	head->tail = &(t->next);
	raise_softirq_irqoff(softirq_nr);
	local_irq_restore(flags);
}
  • 调用test_and_set_bit()函数将待调度的tasklet的state成员变量的bit[0]位(也即TASKLET_STATE_SCHED位)设置为1,该函数同时还返回TASKLET_STATE_SCHED位的原有值。因此如果bit[0]为的原有值已经为1,那就说明这个tasklet已经被调度到另一个CPU上去等待执行了。由于一个tasklet在某一个时刻只能由一个CPU来执行,因此tasklet_schedule()函数什么也不做就直接返回了。否则,就继续下面的调度操作。

  • 首先,调用local_irq_save()函数来关闭当前CPU的中断,以保证下面的步骤在当前CPU上原子地被执行。

  • 然后,将待调度的tasklet添加到当前CPU对应的tasklet队列的尾部。

  • 接着,调用raise_softirq_irqoff函数在当前CPU上触发软中断请求TASKLET_SOFTIRQ。

  • 最后,调用local_irq_restore()函数来开当前CPU的中断。

3.5.2软中断向量TASKLET_SOFTIRQ的服务程序tasklet_action

函数tasklet_action()是tasklet机制与软中断向量TASKLET_SOFTIRQ的联系纽带。正是该函数将当前CPU的tasklet队列中的各个tasklet放到当前CPU上来执行的。该函数实现在kernel/softirq.c文件中,其源代码如下:

static __latent_entropy void tasklet_action(struct softirq_action *a)
{
	tasklet_action_common(a, this_cpu_ptr(&tasklet_vec), TASKLET_SOFTIRQ);
}

static void tasklet_action_common(struct softirq_action *a,
				  struct tasklet_head *tl_head,
				  unsigned int softirq_nr)
{
	struct tasklet_struct *list;

	local_irq_disable();
	list = tl_head->head;
	tl_head->head = NULL;
	tl_head->tail = &tl_head->head;
	local_irq_enable();

	while (list) {
		struct tasklet_struct *t = list;

		list = list->next;

		if (tasklet_trylock(t)) {
			if (!atomic_read(&t->count)) {
				if (!test_and_clear_bit(TASKLET_STATE_SCHED,
							&t->state))
					BUG();
				t->func(t->data);
				tasklet_unlock(t);
				continue;
			}
			tasklet_unlock(t);
		}

		local_irq_disable();
		t->next = NULL;
		*tl_head->tail = t;
		tl_head->tail = &t->next;
		__raise_softirq_irqoff(softirq_nr);
		local_irq_enable();
	}
}
  • 首先,在当前CPU关中断的情况下,“原子”地读取当前CPU的tasklet队列头部指针,将其保存到局部变量list指针中,然后将当前CPU的tasklet队列头部指针设置为NULL,以表示理论上当前CPU将不再有tasklet需要执行(但最后的实际结果却并不一定如此,下面将会看到)。
  • 然后,用一个while{}循环来遍历由list所指向的tasklet队列,队列中的各个元素就是将在当前CPU上执行的tasklet。循环体的执行步骤如下:
  • 用指针t来表示当前队列元素,即当前需要执行的tasklet。
  • 更新list指针为list->next,使它指向下一个要执行的tasklet。
  • 用tasklet_trylock()宏试图对当前要执行的tasklet(由指针t所指向)进行加锁,如果加锁成功(当前没有任何其他CPU正在执行这个tasklet),则用原子读函数atomic_read()进一步判断count成员的值。如果count为0,说明这个tasklet是允许执行的,于是:
  1. 先清除TASKLET_STATE_SCHED位;
  2. 然后,调用这个tasklet的可执行函数func;
  3. 调用宏tasklet_unlock()来清除TASKLET_STATE_RUN位 ;
  4. 最后,执行continue语句跳过下面的步骤,回到while循环继续遍历队列中的下一个元素。如果count不为0,说明这个tasklet是禁止运行的,于是调用tasklet_unlock()清除前面用tasklet_trylock()设置的TASKLET_STATE_RUN位。

3.6 tasklet使用总结

1、声明和使用小任务大多数情况下,为了控制一个常用的硬件设备,小任务机制是实现下半部的最佳选择。小任务可以动态创建,使用方便,执行起来也比较快。我们既可以静态地创建小任务,也可以动态地创建它。选择那种方式取决于到底是想要对小任务进行直接引用还是一个间接引用。如果准备静态地创建一个小任务(也就是对它直接引用),使用下面两个宏中的一个:
DECLARE_TASKLET(name,func, data)
DECLARE_TASKLET_DISABLED(name,func, data)
这两个宏都能根据给定的名字静态地创建一个tasklet_struct结构。当该小任务被调度以后,给定的函数func会被执行,它的参数由data给出。这两个宏之间的区别在于引用计数器的初始值设置不同。第一个宏把创建的小任务的引用计数器设置为0,因此,该小任务处于激活状态。另一个把引用计数器设置为1,所以该小任务处于禁止状态。例如:
DECLARE_TASKLET(my_tasklet,my_tasklet_handler, dev);
这行代码其实等价于
struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0),tasklet_handler,dev};
这样就创建了一个名为my_tasklet的小任务,其处理程序为tasklet_handler,并且已被激活。当处理程序被调用的时候,dev就会被传递给它。

2、编写自己的小任务处理程序小任务处理程序必须符合如下的函数类型:

void tasklet_handler(unsigned long data)
由于小任务不能睡眠,因此不能在小任务中使用信号量或者其它产生阻塞的函数。但是小任务运行时可以响应中断。

3、调度自己的小任务通过调用tasklet_schedule()函数并传递给它相应的tasklt_struct指针,该小任务就会被调度以便适当的时候执行:
tasklet_schedule(&my_tasklet); /*把my_tasklet标记为挂起 */
在小任务被调度以后,只要有机会它就会尽可能早的运行。在它还没有得到运行机会之前,如果一个相同的小任务又被调度了,那么它仍然只会运行一次。

可以调用tasklet_disable()函数来禁止某个指定的小任务。如果该小任务当前正在执行,这个函数会等到它执行完毕再返回。调用tasklet_enable()函数可以激活一个小任务,如果希望把以DECLARE_TASKLET_DISABLED()创建的小任务激活,也得调用这个函数,如:

tasklet_disable(&my_tasklet); /小任务现在被禁止,这个小任务不能运行/

tasklet_enable(&my_tasklet); /* 小任务现在被激活*/

也可以调用tasklet_kill()函数从挂起的队列中去掉一个小任务。该函数的参数是一个指向某个小任务的tasklet_struct的长指针。在小任务重新调度它自身的时候,从挂起的队列中移去已调度的小任务会很有用。这个函数首先等待该小任务执行完毕,然后再将它移去。

4、tasklet的简单用法

下面是tasklet的一个简单应用,以模块的形成加载。

#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/kdev_t.h>
#include <linux/cdev.h>
#include <linux/kernel.h>
#include <linux/interrupt.h>
 
static struct  t asklet_struct my_tasklet;
 
static void tasklet_handler (unsigned long d ata)
{
        printk(KERN_ALERT,"tasklet_handler is running./n");
}
 
static int __init test_init(void)
{
        tasklet_init(&my_tasklet,tasklet_handler,0);
        tasklet_schedule(&my_tasklet);
        return0;
}
 
static  void __exit test_exit(void)
{
        tasklet_kill(&tasklet);
        printk(KERN_ALERT,"test_exit is running./n");
}
MODULE_LICENSE("GPL");
 
module_init(test_init);
module_exit(test_exit);

从这个例子可以看出,所谓的小任务机制是为下半部函数的执行提供了一种执行机制,也就是说,推迟处理的事情是由tasklet_handler实现,何时执行,经由小任务机制封装后交给内核去处理。

四、中断处理的工作队列机制

工作队列(work queue)是另外一种将工作推后执行的形式,它和前面讨论的tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。
那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列;如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。

4.1 工作、工作队列

如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events,自己也可以创建自己的工作者线程。表示工作的数据结构用<linux/workqueue.h>中定义的work_struct结构表示:

struct work_struct {
	atomic_long_t data;
	struct list_head entry;	// 连接所有工作的链表
	work_func_t func; 		// 要执行的函数
#ifdef CONFIG_LOCKDEP
	struct lockdep_map lockdep_map;
#endif
};
typedef void (*work_func_t)(struct work_struct *work);

这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。表示工作队列的数据结构用<kernel/workqueue.c>中定义的workqueue_struct:

/*
 * The externally visible workqueue.  It relays the issued work items to
 * the appropriate worker_pool through its pool_workqueues.
 */
struct workqueue_struct {
	struct list_head	pwqs;		/* WR: all pwqs of this wq */
	struct list_head	list;		/* PR: list of all workqueues */

	struct mutex		mutex;		/* protects this wq */
	int			work_color;	/* WQ: current work color */
	int			flush_color;	/* WQ: current flush color */
	atomic_t		nr_pwqs_to_flush; /* flush in progress */
	struct wq_flusher	*first_flusher;	/* WQ: first flusher */
	struct list_head	flusher_queue;	/* WQ: flush waiters */
	struct list_head	flusher_overflow; /* WQ: flush overflow list */

	struct list_head	maydays;	/* MD: pwqs requesting rescue */
	struct worker		*rescuer;	/* I: rescue worker */

	int			nr_drainers;	/* WQ: drain in progress */
	int			saved_max_active; /* WQ: saved pwq max_active */

	struct workqueue_attrs	*unbound_attrs;	/* PW: only for unbound wqs */
	struct pool_workqueue	*dfl_pwq;	/* PW: only for unbound wqs */

#ifdef CONFIG_SYSFS
	struct wq_device	*wq_dev;	/* I: for sysfs interface */
#endif
#ifdef CONFIG_LOCKDEP
	char			*lock_name;
	struct lock_class_key	key;
	struct lockdep_map	lockdep_map;
#endif
	char			name[WQ_NAME_LEN]; /* I: workqueue name */

	/*
	 * Destruction of workqueue_struct is RCU protected to allow walking
	 * the workqueues list without grabbing wq_pool_mutex.
	 * This is used to dump all workqueues from sysrq.
	 */
	struct rcu_head		rcu;

	/* hot fields used during command issue, aligned to cacheline */
	unsigned int		flags ____cacheline_aligned; /* WQ: WQ_* flags */
	struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
	struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */
};

4.2 创建推后的工作

4.2.1 静态地创建工作(work_struct)

要使用工作队列,首先要做的是创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地建该结构:

DECLARE_WORK(name, func); 或者INIT_WORK(_work, _func)

其定义如下:

#define DECLARE_WORK(n, f)					\
	struct work_struct n = __WORK_INITIALIZER(n, f)

举例如下:

static void do_poweroff(struct work_struct *dummy)
{
	kernel_power_off();
}
 
static DECLARE_WORK(poweroff_work, do_poweroff);

即创建了一个全局静态变量:static work_struct poweroff_work,且被初始化了,其执行函数为do_poweroff。

4.2.2 动态初始化工作(work_struct)

先定义一具struct work_struct 变量,在需要使用时调用INIT_WORK进行初始化,然后便可以使用。

#define INIT_WORK(_work, _func)					\
	do {							\
		__INIT_WORK((_work), (_func), 0);		\
	} while (0)

举例:

void __cfg80211_scan_done(struct work_struct *wk)
{
	struct cfg80211_registered_device *rdev;
 
	rdev = container_of(wk, struct cfg80211_registered_device,
			    scan_done_wk);
 
	cfg80211_lock_rdev(rdev);
	___cfg80211_scan_done(rdev, false);
	cfg80211_unlock_rdev(rdev);
}
 
struct cfg80211_registered_device {
 
	struct work_struct scan_done_wk;
	struct work_struct sched_scan_results_wk;
	struct work_struct conn_work;
	struct work_struct event_work;
	struct cfg80211_wowlan *wowlan;
}
struct cfg80211_registered_device *rdev;
rdev = kzalloc(alloc_size, GFP_KERNEL);
 
INIT_WORK(&rdev->scan_done_wk, __cfg80211_scan_done);  // 其执行函数为: __cfg80211_scan_done
INIT_WORK(&rdev->sched_scan_results_wk, __cfg80211_sched_scan_results);

4.3 对工作进行调度

现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给缺省的events工作线程,只需调用: int schedule_work(struct work_struct *work);
它把work放入全局工作队列:system_wq,其定义如下:

/**
 * schedule_work - put work task in global workqueue
 * @work: job to be done
 *
 * Returns %false if @work was already on the kernel-global workqueue and
 * %true otherwise.
 *
 * This puts a job in the kernel-global workqueue if it was not already
 * queued and leaves it in the same position on the kernel-global
 * workqueue otherwise.
 */
static inline bool schedule_work(struct work_struct *work)
{
	return queue_work(system_wq, work);
}
/* System-wide workqueues which are always present.
 *
 * system_wq is the one used by schedule[_delayed]_work[_on]().
 * Multi-CPU multi-threaded.  There are users which expect relatively
 * short queue flush time.  Don't queue works which can run for too
 * long.
 */
extern struct workqueue_struct *system_wq;

queue_work:把一个工作放入工作队列:

/**
 * queue_work - queue work on a workqueue
 * @wq: workqueue to use
 * @work: work to queue
 *
 * Returns %false if @work was already on a queue, %true otherwise.
 *
 * We queue the work to the CPU on which it was submitted, but if the CPU dies
 * it can be processed by another CPU.
 */
static inline bool queue_work(struct workqueue_struct *wq,
			      struct work_struct *work)
{
	return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}

/**
 * queue_work_on - queue work on specific cpu
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
 * @work: work to queue
 *
 * We queue the work to a specific CPU, the caller must ensure it
 * can't go away.
 *
 * Return: %false if @work was already on a queue, %true otherwise.
 */
bool queue_work_on(int cpu, struct workqueue_struct *wq,
		   struct work_struct *work)
{
	bool ret = false;
	unsigned long flags;

	local_irq_save(flags);

	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
		__queue_work(cpu, wq, work);
		ret = true;
	}

	local_irq_restore(flags);
	return ret;
}

把work放入工作队列,work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:

/**
 * schedule_delayed_work - put work task in global workqueue after delay
 * @dwork: job to be done
 * @delay: number of jiffies to wait or 0 for immediate execution
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue.
 */
static inline bool schedule_delayed_work(struct delayed_work *dwork,
					 unsigned long delay)
{
	return queue_delayed_work(system_wq, dwork, delay);
}

#define DECLARE_DELAYED_WORK(n, f)					\
	struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f, 0)

#define INIT_DELAYED_WORK(_work, _func)					\
	__INIT_DELAYED_WORK(_work, _func, 0)

4.4 创建工作者线程

工作放入工作队列之后,由管理此工作队列的工作者来执行这些work,通过alloc_workqueue或create_singlethread_workqueue来创建工作者线程,它最后调用kthread_create创建线程,其线程名为alloc_workqueue中指定的name,其举例如下:

/**
 * workqueue_init_early - early init for workqueue subsystem
 *
 * This is the first half of two-staged workqueue subsystem initialization
 * and invoked as soon as the bare basics - memory allocation, cpumasks and
 * idr are up.  It sets up all the data structures and system workqueues
 * and allows early boot code to create workqueues and queue/cancel work
 * items.  Actual work item execution starts only after kthreads can be
 * created and scheduled right before early initcalls.
 */
int __init workqueue_init_early(void)
{
	int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
	int hk_flags = HK_FLAG_DOMAIN | HK_FLAG_WQ;
	int i, cpu;

	WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));

	BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
	cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(hk_flags));

	pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);

	/* initialize CPU pools */
	for_each_possible_cpu(cpu) {
		struct worker_pool *pool;

		i = 0;
		for_each_cpu_worker_pool(pool, cpu) {
			BUG_ON(init_worker_pool(pool));
			pool->cpu = cpu;
			cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
			pool->attrs->nice = std_nice[i++];
			pool->node = cpu_to_node(cpu);

			/* alloc pool ID */
			mutex_lock(&wq_pool_mutex);
			BUG_ON(worker_pool_assign_id(pool));
			mutex_unlock(&wq_pool_mutex);
		}
	}

	/* create default unbound and ordered wq attrs */
	for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
		struct workqueue_attrs *attrs;

		BUG_ON(!(attrs = alloc_workqueue_attrs()));
		attrs->nice = std_nice[i];
		unbound_std_wq_attrs[i] = attrs;

		/*
		 * An ordered wq should have only one pwq as ordering is
		 * guaranteed by max_active which is enforced by pwqs.
		 * Turn off NUMA so that dfl_pwq is used for all nodes.
		 */
		BUG_ON(!(attrs = alloc_workqueue_attrs()));
		attrs->nice = std_nice[i];
		attrs->no_numa = true;
		ordered_wq_attrs[i] = attrs;
	}

	system_wq = alloc_workqueue("events", 0, 0);
	system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
	system_long_wq = alloc_workqueue("events_long", 0, 0);
	system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
					    WQ_UNBOUND_MAX_ACTIVE);
	system_freezable_wq = alloc_workqueue("events_freezable",
					      WQ_FREEZABLE, 0);
	system_power_efficient_wq = alloc_workqueue("events_power_efficient",
					      WQ_POWER_EFFICIENT, 0);
	system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
					      WQ_FREEZABLE | WQ_POWER_EFFICIENT,
					      0);
	BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
	       !system_unbound_wq || !system_freezable_wq ||
	       !system_power_efficient_wq ||
	       !system_freezable_power_efficient_wq);

	return 0;
}


/**
 * workqueue_init - bring workqueue subsystem fully online
 *
 * This is the latter half of two-staged workqueue subsystem initialization
 * and invoked as soon as kthreads can be created and scheduled.
 * Workqueues have been created and work items queued on them, but there
 * are no kworkers executing the work items yet.  Populate the worker pools
 * with the initial workers and enable future kworker creations.
 */
int __init workqueue_init(void)
{
	struct workqueue_struct *wq;
	struct worker_pool *pool;
	int cpu, bkt;

	/*
	 * It'd be simpler to initialize NUMA in workqueue_init_early() but
	 * CPU to node mapping may not be available that early on some
	 * archs such as power and arm64.  As per-cpu pools created
	 * previously could be missing node hint and unbound pools NUMA
	 * affinity, fix them up.
	 *
	 * Also, while iterating workqueues, create rescuers if requested.
	 */
	wq_numa_init();

	mutex_lock(&wq_pool_mutex);

	for_each_possible_cpu(cpu) {
		for_each_cpu_worker_pool(pool, cpu) {
			pool->node = cpu_to_node(cpu);
		}
	}

	list_for_each_entry(wq, &workqueues, list) {
		wq_update_unbound_numa(wq, smp_processor_id(), true);
		WARN(init_rescuer(wq),
		     "workqueue: failed to create early rescuer for %s",
		     wq->name);
	}

	mutex_unlock(&wq_pool_mutex);

	/* create the initial workers */
	for_each_online_cpu(cpu) {
		for_each_cpu_worker_pool(pool, cpu) {
			pool->flags &= ~POOL_DISASSOCIATED;
			BUG_ON(!create_worker(pool));
		}
	}

	hash_for_each(unbound_pool_hash, bkt, pool, hash_node)
		BUG_ON(!create_worker(pool));

	wq_online = true;
	wq_watchdog_init();

	return 0;
}

如:cfg80211_wq = create_singlethread_workqueue(“cfg80211”);创建了一个名为cfg80211的kernel线程。

4.5 工作队列的简单应用

#include <linux/module.h> 
#include <linux/init.h> 
#include <linux/workqueue.h> 
static struct workqueue_struct *queue = NULL; 
static struct work_struct work; 
static void work_handler(struct work_struct *data) 
{
  	printk(KERN_ALERT "work handler function.\n"); 
}

static int __init test_init(void) 
{
	queue = create_singlethread_workqueue("helloworld"); 
	/*创建一个单线程的工作队列*/         
	if (!queue)                 
		goto err;         
	INIT_WORK(&work, work_handler);         
	schedule_work(&work);         
	return 0; 
	err:         
	return -1; 
} 
static void __exit test_exit(void) {
    destroy_workqueue(queue); 
} 
    
MODULE_LICENSE("GPL"); 
module_init(test_init); 
module_exit(test_exit);

参考

[1] https://blog.csdn.net/myarrow/article/details/9287169

[2] https://blog.csdn.net/yhb1047818384/article/details/63687126

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Linux中断处理的“下半部”机制 的相关文章

  • Linux 中 m 标志和 o 标志将存储在哪里

    我想知道最近收到的路由器通告的 m 标志和 o 标志的值 从内核源代码中我知道存储了 m 标志和 o 标志 Remember the managed otherconf flags from most recently received R
  • SSH,运行进程然后忽略输出

    我有一个命令可以使用 SSH 并在 SSH 后运行脚本 该脚本运行一个二进制文件 脚本完成后 我可以输入任意键 本地终端将恢复到正常状态 但是 由于该进程仍在我通过 SSH 连接的计算机中运行 因此任何时候它都会登录到stdout我在本地终
  • 相当于Linux中的导入库

    在 Windows C 中 当您想要链接 DLL 时 您必须提供导入库 但是在 GNU 构建系统中 当您想要链接 so 文件 相当于 dll 时 您就不需要链接 为什么是这样 是否有等效的 Windows 导入库 注意 我不会谈论在 Win
  • FileOutputStream.close() 中的设备 ioctl 不合适

    我有一些代码可以使用以下命令将一些首选项保存到文件中FileOutputStream 这是我已经写了一千遍的标准代码 FileOutputStream out new FileOutputStream file try BufferedOu
  • 为什么 Linux 没有 DirectX API?

    在考虑现代显卡的 Windows 系统上 DirectX API 的驱动程序端实现时 我想知道为什么此实现在非 Windows 系统 尤其是 Linux 上不可用 由于明显缺乏此功能 我只能假设有一个我无视的充分理由 但在我的原始理解中 我
  • 调用 printf 系统子例程在汇编代码中输出整数错误[重复]

    这个问题在这里已经有答案了 来回 在windows7控制台窗口中运行gcc s2 asm 然后生成一个exe文件 运行a exe 然后崩溃 为什么 s2 asm 代码由以下源代码生成 int m m 1 iprint m s2 asm请参考
  • Google BQ:运行参数化查询,其中参数变量是 BQ 表目标

    我正在尝试从 Linux 命令行为 BQ 表目标运行 SQL 此 SQL 脚本将用于多个日期 客户端和 BQ 表目标 因此这需要在我的 BQ API 命令行调用中使用参数 标志 parameter 现在 我已经点击此链接来了解参数化查询 h
  • Linux 上的 Pervasive ODBC 错误 [01000][unixODBC][驱动程序管理器]无法打开 lib '/usr/local/psql/lib/odbcci.so':找不到文件

    我正在尝试让 Pervasive v10 客户端 ODBC 在 Centos 6 上运行 据我所知 没有 64 位 ODBC 客户端 因此我必须使用 32 位客户端 我终于成功安装了它 但尝试使用时出现以下错误 isql v mydsn 0
  • 如何在linux中以编程方式获取dir的大小?

    我想通过 C 程序获取 linux 中特定目录的确切大小 我尝试使用 statfs path struct statfs 但它没有给出确切的大小 我也尝试过 stat 但它返回任何目录的大小为 4096 请建议我如何获取 dir 的确切大小
  • C 语言的符号表

    我目前正在开发一种执行模式匹配的静态分析工具 我在用Flex https github com westes flex生成词法分析器 我编写了代码来管理符号表 我不太有经验C 所以我决定将符号表实现为线性链表 include
  • 尽管 if 语句,Visual Studio 仍尝试包含 Linux 标头

    我正在尝试创建一个强大的头文件 无需更改即可在 Windows 和 Linux 上进行编译 为此 我的包含内容中有一个 if 语句 如下所示 if defined WINDOWS include
  • 使用 MAX_ORDER / 包含 mmzone.h

    根据https www kernel org doc Documentation networking packet mmap txt https www kernel org doc Documentation networking pa
  • 从 Xlib 转换为 xcb

    我目前正在将我的一个应用程序从 Xlib 移植到 libxcb 但在查找有关我有时使用的 XInput2 扩展的信息时遇到了一些麻烦 libxcb 中有 XInput2 实现吗 如果是的话 在哪里可以找到文档 目前我在使用此功能时遇到问题
  • Bash - 在与当前终端分开的另一个终端中启动命令的新实例

    我有一个简单的 bash 脚本 test sh 设置如下 bin bash args if args 0 check capture then watch n 1 ls lag home user capture0 watch n 1 ls
  • 如何使用waf构建共享库?

    我想使用构建一个共享库waf http code google com p waf 因为它看起来比 GNU 自动工具更容易 更简洁 到目前为止 我实际上有几个与我开始编写的 wscript 有关的问题 VERSION 0 0 1 APPNA
  • 与 pthread 的进程间互斥

    我想使用一个互斥体 它将用于同步对两个不同进程共享的内存中驻留的某些变量的访问 我怎样才能做到这一点 执行该操作的代码示例将非常感激 以下示例演示了 Pthread 进程间互斥体的创建 使用和销毁 将示例推广到多个进程作为读者的练习 inc
  • 错误:“rjags”的包或命名空间加载失败

    在终端的 conda 环境之一中 我能够成功安装包 rjags 但是 当我在该环境中运行 R 并运行库 rjags 时 出现以下错误 加载所需的包 coda 错误 rjags 的包或命名空间加载失败 rjags 的 loadNamespac
  • Apache 访问 Linux 中的 NTFS 链接文件夹

    在 Debian jessie 中使用 Apache2 PHP 当我想在 Apache 的文档文件夹 var www 中创建一个新的小节时 我只需创建一个指向我的 php 文件所在的外部文件夹的链接 然后只需更改该文件夹的所有者和权限文件夹
  • 这种文件锁定方法可以接受吗?

    我们有 10 个 Linux 机器 每周必须运行 100 个不同的任务 这些计算机主要在我们晚上在家时执行这些任务 我的一位同事正在开发一个项目 通过使用 Python 自动启动任务来优化运行时间 他的程序将读取任务列表 抓取一个打开的任务
  • 如何在 Mac OSX Mavericks 中正确运行字符串工具?

    如何在 Mac OSX Mavericks 中正确运行字符串工具 我尝试按照我在网上找到的示例来运行它 strings a UserParser class 但我收到此错误 错误 Applications Xcode app Content

随机推荐

  • 给定两个字符串str1和str2,查找str2在str1中出现的位置

    给定string str1和string str2 xff0c 编写一个库函数 xff0c 返回str2在str1中的位置 如 xff1a str1为 34 ABCDLANCEXYZ 34 xff0c str2为 34 LANCE 34 x
  • 中国交通标志检测数据集

    版权声明 xff1a 本文为转发问 xff0c 原文见博客 https blog csdn net dong ma article details 84339007 中国交通标志检测数据集 xff08 CCTSDB xff09 来源于 A
  • CentOS 修改ls目录的颜色

    修改ls目录的颜色 linux系统默认目录颜色是蓝色的 xff0c 在黑背景下看不清楚 xff0c 可以通过以下2种方法修改ls查看的颜色 bash profile文件在用的根目录下 xff0c ls al可以看到 方法一 xff1a 1
  • Tiny210(S5PV210) U-BOOT(六)----DDR内存配置

    上次讲完了Nand Flash的低级初始化 xff0c 然后Nand Flash的操作主要是在board init f nand xff0c 中 xff0c 涉及到将代码从Nand Flash中copy到DDR中 xff0c 这个放到后面实
  • NAND FLASH命名规则

    基于网络的一个修订版 三星的pure nandflash xff08 就是不带其他模块只是nandflash存储芯片 xff09 的命名规则如下 xff1a 1 Memory K 2 NANDFlash 9 3 Small Classifi
  • s3c6410 DMA

    S3C6410中DMA操作步骤 xff1a 1 决定使用安全DMAC SDMAC 还是通用DMAC DMAC xff1b 2 开始相应DMAC的系统时钟 xff0c 并关闭另外一组的时钟 xff08 系统默认开启SDMA时钟 xff09 x
  • Visual Studio和VS Code的区别

    1 Visual Studio简介 xff1a 是一个集成开发环境 IDE xff0c 安装完成后就能直接用 xff0c 编译工具 xff0c 调试工具 xff0c 各个语言的开发工具 xff0c 都是已经配置好的 xff0c 开箱即用 适
  • 博客转移

    由于CSDN 文章不间断的会丢失图片 xff0c 然后逼格也不够高 xff0c 导致几年都没有写博客 xff0c 全部是记录至印象笔记中 xff0c 但是久了也不太好 xff0c 所以最近搞了一个自己的个人博客 xff0c 以后文章全部写至
  • 安装qt-everywhere-opensource-src-4.8.6

    1 下载 qt everywhere opensource src 4 8 6 http mirrors hust edu cn qtproject official releases qt 4 8 4 8 6 qt everywhere
  • CentOS6.5上安装qt-creator-opensource-linux-x86-3.1.2.run

    1 qt creator opensource linux x86 3 1 2 run的下载 wget http mirrors hustunique com qt official releases qtcreator 3 1 3 1 2
  • atoi()函数

    atoi 函数 原型 xff1a int atoi xff08 const char nptr xff09 用法 xff1a include lt stdlib h gt 功能 xff1a 将字符串转换成整型数 xff1b atoi 会扫描
  • ubuntu中printk打印信息

    1 设置vmware添加seria port 使用文件作为串口 2 启动ubuntu xff0c 修改 etc default grub GRUB CMDLINE LINUX DEFAULT 61 34 34 GRUB CMDLINE LI
  • 静态库、共享库、动态库概念?

    通常库分为 xff1a 静态库 共享库 xff0c 动态加载库 下面分别介绍 一 静态库 xff1a 1 概念 xff1a 静态库就是一些目标文件的集合 xff0c 以 a结尾 静态库在程序链接的时候使用 xff0c 链接器会将程序中使用
  • 链表——怎么写出正确的链表?

    链表 相比数组 xff0c 链表不需要一块连续的内存空间 xff0c 而是通过指针将一组零散的内存块串联起来使用 xff0c 而这里的内存块就叫做节点 xff0c 一般节点除了保存data还会保存下一个节点的地址也就是指针 单链表 头节点
  • 【STM32】STM32 变量存储在片内FLASH的指定位置

    在这里以STM32L4R5为例 xff08 官方出的DEMO板 xff09 xff0c 将变量存储在指定的片内FLASH地址 xff08 0x081F8000 xff09 一 MDK Keil软件操作 uint8 t version spa
  • 【STM32】 利用paho MQTT&WIFI 连接阿里云

    ST联合阿里云推出了云接入的相关培训 xff08 基于STM32的端到端物联网全栈开发 xff09 xff0c 所采用的的板卡为NUCLEO L4R5ZI板 xff0c 实现的主要功能为采集温湿度传感器上传到阿里云物联网平台 xff0c 并
  • 【STM32 】通过ST-LINK utility 实现片外FLASH的烧写

    目录 前言 一 例程参考及讲解 1 1 Loader Src c文件 1 2 Dev Inf c文件 二 程序修改 三 实测 参考 前言 在单片机的实际应用中 xff0c 通常会搭载一些片外FLASH芯片 xff0c 用于存储系统的一些配置
  • 基于FFmpeg的推流器(UDP推流)

    一 推流器对于输入输出文件无要求 1 输入文件可为网络流URL xff0c 可以实现转流器 2 将输入的文件改为回调函数 xff08 内存读取 xff09 的形式 xff0c 可以推送内存中的视频数据 3 将输入文件改为系统设备 xff08
  • 十进制转十六进制的C语言实现

    include lt stdio h gt include lt stdlib h gt include lt string h gt void reversestr char source char target unsigned int
  • Linux中断处理的“下半部”机制

    前言 中断分为硬件中断 xff0c 软件中断 中断的处理原则主要有两个 xff1a 一个是不能嵌套 xff0c 另外一个是越快越好 在Linux中 xff0c 分为中断处理采用 上半部 和 下半部 处理机制 一 中断处理 下半部 机制 中断