Username: Password:

(转载)Linux内核同步方式
来源: ChinaUnix博客  作者: 发布时间:2008-01-01 20:08:00


同步通常是为了达到多线程协同的目的而设计的一种机制,通常包含异步信号机制和互斥机制作为其实现的底层。在Linux 2.4内核中也有相应的技术实现,包括信号量、自旋锁、原子操作和等待队列,其中原子操作和等待队列又是实现信号量的底层。
等待队列和异步信号
wait queue很早就作为一个基本的功能单位出现在Linux内核里了,他以队列为基础数据结构,和进程调度机制紧密结合,能够用于实现核心的异步事件通知机制。我们从他的使用范例着手,看看等待队列是怎样实现异步信号功能的。
在核心运行过程中,经常会因为某些条件不满足而需要挂起当前线程,直至条件满足了才继续执行。在2.4内核中提供了一组新接口来实现这样的功能,下面的代码节选自kernel/printk.c:
    unsigned long log_size;
1:  DECLARE_WAIT_QUEUE_HEAD(log_wait);...
4:  spinlock_t console_lock = SPIN_LOCK_UNLOCKED;...
    int do_syslog(int type,char *buf,int len){
        ...
2:      error=wait_event_interruptible(log_wait,log_size);
        if(error)
             goto out;
        ...
5:      spin_lock_irq(&console_lock);
        ...
        log_size--;
        ...
6:        spin_unlock_irq(&console_lock);
        ...
    }
    asmlinkage int printk(const char *fmt,...){
        ...
7:        spin_lock_irqsave(&console_lock,flags);
        ...
        log_size++;...
8:        spin_unlock_irqrestore(&console_lock,flags);
3:      wake_up_interruptible(&log_wait);
        ...
    }
   
这段代码实现了printk调用和syslog之间的同步,syslog需要等待printk送数据到缓冲区,因此,在2:处等待log_size非0;而printk一边传送数据,一边增加log_size的值,完成后唤醒在log_wait上等待的任何线程(这个线程不是用户空间的线程概念,而是核内的一个执行序列)。执行了3:的wake_up_interruptible()后,2:处的wait_event_interruptible()返回0,从而进入syslog的实际动作。
1:是定义log_wait全局变量的宏调用。
在实际操作log_size全局变量的时候,还使用了spin_lock自旋锁来实现互斥,关于自旋锁,这里暂不作解释,但从这段代码中已能够清楚的知道他的使用方法了。
任何wait queue使用上的技巧体现在wait_event_interruptible()的实现上,代码位于include/linux/sched.h中,前置数字表示行号:
779 #define __wait_event_interruptible(wq, condition, ret)                  \
780 do {                                                                    \
781         wait_queue_t __wait;                                            \
782         init_waitqueue_entry(&__wait, current);                         \
783                                                                         \
784         add_wait_queue(&wq, &__wait);                                   \
785         for (;;) {                                                      \
786                 set_current_state(TASK_INTERRUPTIBLE);                  \
787                 if (condition)                                          \
788                         break;                                          \
789                 if (!signal_pending(current)) {                         \
790                         schedule();                                     \
791                         continue;                                       \
792                 }                                                       \
793                 ret = -ERESTARTSYS;                                     \
794                 break;                                                  \
795         }                                                               \
796         current->state = TASK_RUNNING;                                  \
797         remove_wait_queue(&wq, &__wait);                                \
798 } while (0)
799         
800 #define wait_event_interruptible(wq, condition)                         \
801 ({                                                                      \
802         int __ret = 0;                                                  \
803         if (!(condition))                                               \
804                 __wait_event_interruptible(wq, condition, __ret);       \
805         __ret;                                                          \
806 })

在wait_event_interruptible()中首先判断condition是不是已满足,假如是则直接返回0,否则调用__wait_event_interruptible(),并用__ret来存放返回值。__wait_event_interruptible()首先定义并初始化一个wait_queue_t变量__wait,其中数据为当前进程结构current(struct task_struct),并把__wait入队。在无限循环中,__wait_event_interruptible()将本进程置为可中断的挂起状态,反复检查condition是否成立,假如成立则退出,假如不成立则继续休眠;条件满足后,即把本进程运行状态置为运行态,并将__wait从等待队列中清除掉,从而进程能够调度运行。假如进程当前有异步信号(POSIX的),则返回-ERESTARTSYS。
挂起的进程并不会自动转入运行的,因此,还需要一个唤醒动作,这个动作由wake_up_interruptible()完成,他将遍历作为参数传入的log_wait等待队列,将其中任何的元素(通常都是task_struct)置为运行态,从而可被调度到,执行__wait_event_interruptible()中的代码。
DECLARE_WAIT_QUEUE_HEAD(log_wait)经过宏展开后就是定义了一个log_wait等待队列头变量:
struct __wait_queue_head log_wait = {
        lock:        SPIN_LOCK_UNLOCKED,
        task_list:      { &log_wait.task_list, &log_wait.task_list }
}

其中task_list是struct list_head变量,包括两个list_head指针,一个next、一个prev,这里把他们初始化为自身,属于队列实现上的技巧,其细节能够参阅关于内核list数据结构的讨论,add_wait_queue()和remove_wait_queue()就等同于list_add()和list_del()。
wait_queue_t结构在include/linux/wait.h中定义,关键元素即为一个struct task_struct变量表征当前进程。
除了wait_event_interruptible()/wake_up_interruptible()以外,和此相对应的更有wait_event()和wake_up()接口,interruptible是更安全、更常用的选择,因为可中断的等待能够接收信号,从而挂起的进程允许被外界kill。
wait_event*()接口是2.4内核引入并推荐使用的,在此之前,最常用的等待操作是interruptible_sleep_on(wait_queue_head_t *wq),当然,和此配套的更有不可中断版本sleep_on(),另外,更有带有超时控制的*sleep_on_timeout()。sleep_on系列函数的语义比wait_event简单,没有条件判断功能,其余动作和wait_event完全相同,也就是说,我们能够用interruptible_sleep_on()来实现wait_event_interruptible()(仅作示意〉:
do{
        interruptible_sleep_on(&log_wait);
        if(condition)
                break;
}while(1);

相对而言,这种操作序列有反复的入队、出队动作,更加耗时,而很大一部分等待操作的确是需要判断一个条件是否满足的,因此2.4才推荐使用wait_event接口。
在wake_up系列接口中,更有一类wake_up_sync()和wake_up_interruptible_sync()接口,确保调度在wake_up返回之后进行。




回页首
原子操作和信号量
POSIX有信号量,SysV IPC有信号量,核内也有信号量,接口很简单,一个down(),一个up(),分别对应P操作和V操作,down()调用可能引起线程挂起,因此和sleep_on类似,也有interruptible系列接口。down意味着信号量减1,up意味着信号量加1,这两个操作显然需要互斥。在Linux 2.4中,并没有如想象中的用锁实现,而是使用了原子操作。
在include/asm/atomic.h中定义了一系列原子操作,包括原子读、原子写、原子加等等,大多是直接用汇编语句来实现的,这里就不周详解释。
我们从信号量数据结构开始,他定义在include/asm/semaphore.h中:
struct semaphore {
        atomic_t count;
        int sleepers;
        wait_queue_head_t wait;
}

down()操作能够理解为申请资源,up()操作能够理解为释放资源,因此,信号量实际表示的是资源的数量连同是否有进程正在等待。在semaphore结构中,count相当于资源计数,为正数或0时表示可用资源数,-1则表示没有空闲资源且有等待进程。而等待进程的数量并不关心。这种设计主要是考虑和信号量的原语相一致,当某个进程执行up()函数释放资源,点亮信号灯时,假如count恢复到0,则表示尚有进程在等待该资源,因此执行唤醒操作。一个典型的down()-up()流程是这样的:
down()-->count做原子减1操作,假如结果不小于0则表示成功申请,从down()中返回;
-->假如结果为负(实际上只可能是-1),则表示需要等待,则调用__down_fail();
__down_fail()调用__down(),__down()用C代码实现,需要已不如down()和__down_fail()严格,在此作实际的等待(arch/i386/kernel/semaphore.c):
void __down(struct semaphore * sem)
{
        struct task_struct *tsk = current;
        DECLARE_WAITQUEUE(wait, tsk);
        tsk->state = TASK_UNINTERRUPTIBLE;
        add_wait_queue_exclusive(&sem->wait, &wait);
        spin_lock_irq(&semaphore_lock);
        sem->sleepers++;
        for (;;) {
                int sleepers = sem->sleepers;
                /*
                 * Add "everybody else" into it. They aren’t
                 * playing, because we own the spinlock.
                 */
                if (!atomic_add_negative(sleepers - 1, &sem->count)) {
                        sem->sleepers = 0;
                        break;
                }
                sem->sleepers = 1;      /* us - see -1 above */
                spin_unlock_irq(&semaphore_lock);
                schedule();
                tsk->state = TASK_UNINTERRUPTIBLE;
                spin_lock_irq(&semaphore_lock);
        }
        spin_unlock_irq(&semaphore_lock);
        remove_wait_queue(&sem->wait, &wait);
        tsk->state = TASK_RUNNING;
        wake_up(&sem->wait);
}

__down()-->当前进程进入wait等待队列,状态为不可中断的挂起,sleepers++,假如这是第一次申请失败,则sleepers值为1,否则为2--这个配置纯粹是为了下面这句原子加而安排的。
在真正进入休眠以前,__down()还是需要判断一下是不是确实没有资源可用,因为在spin_lock之前什么都可能发生。atomic_add_negative()将sleepers-1(只可能是0或1,分别表示仅有一个等待进程或是多个)加到count(假如有多个进程申请资源失败进入__down(),count可能为-2、-3等)之上,这个加法完成后,结果为0只可能是在sleepers等于1的时候发生(因为假如sleepers等于2,表示有多个进程执行了down(),则count必然小于-1,因此sleepers-1+count必然小于0),表示count在此之前已变为0了,也就是说已有进程释放了资源,因此本进程不用休眠而是获得资源退出__down(),从而也从down()中返回;假如没有进程释放资源,那么在任何等待进程的这一加法完成后,count将等于-1。因此,从down()调用外面看(无论是在down()中休眠还是获得资源离开down()),count为负时只可能为-1(此时sleepers等于1),这么设计使得up()操作只需要对count加1,判断是否为0就能够知道是否有必要执行唤醒操作__up_wakeup()了。
获得了资源的进程将把sleepers设为0,并唤醒任何其他等待进程,这个操作实际上只是起到恢复count为-1,并使他们再次进入休眠的作用,因为第一个被唤醒的等待进程执行atomic_add_negative()操作后会将count恢复为-1,然后将sleepers置为1;以后的等待进程则会像往常相同重新休眠。
将down()操作设计得如此复杂的原因和结果就是up操作相当简单。up()利用汇编原子地将count加1,假如小于等于0表示有等待进程,则调用__up_wakeup()-->__up()唤醒wait;否则直接返回。
在down()中竞争获得资源的进程并不是按照优先级排序的,只是在up()操作完成后第一个被唤醒或正在__down()中运行而暂未进入休眠的进程成功的可能性稍高一些。
尽管能够将信号量的count初始化为1从而实现一种互斥锁(mutex),但Linux并不确保这个count不会超过1,因为up操作并不考虑count的初值,所以只能依靠程式员自己来控制不要无谓的执行up()从而破坏mutex的语义。相关的初始化接口定义在include/asm/semaphore.h中,但一般程式员能够通过sema_init()接口来初始化信号量:
#define DECLARE_MUTEX(name) __DECLARE_SEMAPHORE_GENERIC(name,1)
#define DECLARE_MUTEX_LOCKED(name) __DECLARE_SEMAPHORE_GENERIC(name,0)
static inline void sema_init (struct semaphore *sem, int val)
static inline void init_MUTEX (struct semaphore *sem)
static inline void init_MUTEX_LOCKED (struct semaphore *sem)

除了down()以外,Linux还提供了一个down_interruptible(),操作序列和down()基本相同,仅在休眠状态为可中断和信号处理上有所不同。在标准的信号量以外,更有一套读写信号量,用于将资源的读写区分开来进行同步以提高效率,采用读写锁来实现,有兴趣的能够参阅文后列出的参考资料。




回页首
自旋锁
锁是个概念,正如上面提到的mutex互斥锁仅仅是其中的一种。互斥锁被锁定时进入休眠,而系统还能正常运转,但有很多时候,锁应该不但仅互斥访问,甚至应该让系统挂起,直至锁成功,也就是说在锁操作中"自旋",这就是Linux中的spinlock机制。
从实现上来说,自旋锁比较简单,主要分为两部分,一部分是中断处理,一部分是自旋处理,最基础的部分在spin_lock_string和spin_unlock_string这两段汇编代码中:
#define spin_lock_string \
        "\n1:\t" \
        "lock ; decb %0\n\t" \
        "js 2f\n" \
        ".section .text.lock,\"ax\"\n" \
        "2:\t" \
        "cmpb $0,%0\n\t" \
        "rep;nop\n\t" \
        "jle 2b\n\t" \
        "jmp 1b\n" \
        ".previous"
#define spin_unlock_string \
        "movb $1,%0"

不周详解释这段汇编代码的语义,spin_lock_string对锁原子减1,循环检查锁值,直到锁值大于0;而spin_unlock_string则是对锁赋值1。spin_lock_string用于构成spin_lock()函数,spin_unlock_string用于构成spin_unlock()函数。
spin_lock()/spin_unlock()构成了自旋锁机制的基础,他们和关中断local_irq_disable()/开中断local_irq_enable()、关bh local_bh_disable()/开bh local_bh_enable()、关中断并保存状态字local_irq_save()/开中断并恢复状态字local_irq_restore()结合就形成了整套自旋锁机制,接口定义在include/linux/spinlock.h中,这里就不列举了。
实际上,以上提到的spin_lock()都是在CONFIG_SMP的前提下才会生成的,也就是说,在单处理机系统中,spin_lock()是一条空语句,因为在处理机执行他的语句时,不可能受到打扰,语句肯定是串行的。在这种简单情况下,spin_lock_irq()就只需要锁中断就能够完成任务了。在include/linux/spinlock.h中就用#ifdef CONFIG_SMP来区分两种不同的情况。
自旋锁有很多种,信号量也能够用来构成互斥锁,原子操作也有锁功能,而且更有和标准锁机制类似的读写锁变种,在不同的应用场合应该选择不同的锁,下一节就是介绍怎样选择。




回页首
锁的使用
1.用户上下文之间

假如所访问的共享资源仅在用户上下文中使用,最高效的办法就是使用信号量。在net/core/netfilter.c中有一处使用信号量的例子:
static DECLARE_MUTEX(nf_sockopt_mutex);
int nf_register_sockopt(struct nf_sockopt_ops *reg)
{
...
        if (down_interruptible(&nf_sockopt_mutex) != 0)
                return -EINTR;
...
out:
        up(&nf_sockopt_mutex);
        return ret;
}

2.用户上下文和bottom half之间

此时有两种情况需要使用锁,一是用户上下文被bottom half中断,二是多个处理机同时进入一个临界段。一般情况下,使用spin_lock_bh()/spin_unlock_bh()能够满足需要,他将关闭当前CPU的bottom half,然后再获取锁,直至离开临界段再释放并对bottom half重新使能。
3.用户上下文和软中断(Tasklet)之间

tasklet和bottom half的实现机制是相同的,实际上spin_lock_bh()也同时关闭了tasklet的执行,因此,在这种情况下,用户上下文和tasklet之间的同步也使用spin_lock_bh()/spin_unlock_bh()。
4.bottom half之间

bottom half本身的机制就确保了不会有多于1个的bottom half同时处于运行态,即使对于SMP系统也是如此,因此,在设计共享数据的bottom half时无需考虑互斥。
5.tasklet之间

tasklet和bottom half类似,也是受到local_bh_disable()保护的,因此,同一个tasklet不会同时在两个CPU上运行;但不同的tasklet却有可能,因此,假如需要同步不同的tasklet访问共享数据的话,就应该使用spin_lock()/spin_unlock()。正如上面提到的,这种保护仅对SMP系统有意义,UP系统中tasklet的运行不会受到另一个tasklet(不论他是否和之相同)的打扰,因此也就没有必要上锁。
6.softirq之间

softirq是实现tasklet和bottom half的基础,限制较后二者都少,允许两个softirq同时运行于不同的CPU之上,而不论他们是否来自同一个softirq代码,因此,在这种情况下,都需要用spin_lock()/spin_unlock()来同步。
7.硬中断和软中断之间

硬中断是指硬件中断的处理程式上下文,软中断包括softirq和在他基础上实现的tasklet和bottom half等,此时,为了防止硬件中断软中断的运行,同步措施必须包括关闭硬件中断,spin_lock_irq()/spin_unlock_irq()就包括这个动作。更有一对API,spin_lock_irqsave()/spin_unlock_irqrestore(),不但关闭中断,还保存机器状态字,并在打开中断时恢复。
8.其他注意事项

首先需要提醒的是"死锁",这在操作系统原理的课本上都做过介绍,无论是使用信号量还是使用自旋锁,都有可能产生死锁,特别是自旋锁,假如死锁在spin_lock上,整个系统就会挂起。怎样避免死锁是理论课的问题,这里就不多说了。
另外一点就是尽可能短时间的锁定,因此,"对数据上锁,而不是对代码上锁"就成了一个简单的原则;在有可能的情况下,使用读写锁,而不要总是使用互斥锁;对读写排序,使用原子操作,从而完全避免使用锁,也是个不错的设计思想。
不要在锁定状态下调用可能引起休眠的操作,以下这些操作就是现在可能因此休眠的函数:
  • 对用户内存的访问:copy_from_user()、copy_to_user()、get_user()、put_user()
  • kmalloc(GFP_KERNEL)
  • down_interruptible()和down(),假如需要在spinlock中使用信号量,能够选择down_trylock(),他不会引起挂起 printk()的灵巧设计使得他不会挂起,因此能够在任何上下文中使用。

    本文来自ChinaUnix博客,假如查看原文请点:http://blog.chinaunix.net/u1/35795/showart_493963.html
  • 喜欢本文,那就收藏到:

      Del.icio.us Google书签 Digg Live Bookmark Technorati Furl Yahoo书签 Facebook 百度搜藏 新浪ViVi 365Key网摘 天极网摘 和讯网摘 博拉网 POCO网摘 添加到饭否 QQ书签 Digbuzz我挖网
    相关评论  我也要评论
    还没有关于此文章的相关评论!
  • 昵称: (为空则显示guest)
  • 评论分数: ★ ★ ★★★ ★★★★ ★★★★★
  • 评论内容:(不能超过250字,需审核后才会公布,请自觉遵守互联网相关政策法规。
  • 导航
    赞助商
    文章类别
    订阅