OC底层原理29:锁的原理

张建 lol

前言

本文主要介绍常见的锁,以及 synchronized、NSLock、递归锁、条件锁 的底层分析

借鉴一张锁的性能数据对比图,如下所示

可以看出,图中锁的性能从高到底依次是:OSSPinLock(自旋锁) -> dispatch_semaphone(信号量) -> ptread_mutex(互斥锁) -> NSLock(互斥锁) -> NSCondition(条件锁) -> pthread_mutex(recusive 互斥锁) -> NSRecursiveLock(递归锁) -> NSConditionLock(条件锁) -> synchronized(互斥锁)

图中大致分为以下几类:

  • 【自旋锁】:在自旋锁中,线程会反复检查变量是否可用。由于线程这个过程中一致保持执行,所以是一种 忙等待。一旦获取了自旋锁,线程 就会一直保持该锁,直到显示释放自旋锁。自旋锁避免了进程上下文的调度开销,因此对于 线程只会阻塞很短时间的场合有效 的。对于iOS属性的修饰符 atomic,自带一把自旋锁

    • OSSpinLock(废弃)->(互斥锁)os_unfair_lock
    • atomic
  • 【互斥锁】:互斥锁 是一种用于 多线程编程 中,防止两条线程同时对同一公共资源(例如全局变量)进行读写的机制,该目的是通过 将代码切成一个个临界区 而达成

    • @synchroized
    • NSLock
    • pthread_mutex
  • 【条件锁】:条件锁 就是 条件变量,当进程的某些 资源要求不满足 时就 进入休眠,即锁住了,当 资源被分配到了条件锁打开了,进程继续进行

    • NSCondition
    • NSConditionLock
  • 【递归锁】:递归锁就是 同一个线程可以加锁N次而不会引发死锁。递归锁是 特殊的互斥锁,即是 带有递归性质的互斥锁

    • pthread_mutex(recursive)
    • NSRecursiveLock
  • 【信号量】:信号量 是一种 更高级的同步机制互斥锁 可以说是 semaphore在取值0/1时的特例,信号量可以有更多的取值空间,用来 实现更加复杂的同步,而不是单单的线程间互斥

    • dispatch_semaphore
  • 【读写锁】:读写锁实际是一种 特殊的自旋锁。将对共享资源的访问分成 读者写者读者 只对共享资源 进行读访问写者 则需要对共享资源 进行写操作,这种锁相对于自旋锁而言,能 提高并发性

    • 一个读写锁同时只能有一个写者或者多个读者,但不能即有读者又有写者,在读写锁保持期间也是抢占失效的

    • 如果 读写锁当前没有读者,也没有写者,那么写者 可以立刻获取 读写锁,否则它必须自旋 在那里,直到没有任何写者或读者,如果读写锁没有写者,那么读者可以 立即获取读写锁

其实 基本的锁 就包括三类:自旋锁、互斥锁、读写锁,其他的比如 条件锁、递归锁、信号量 都是 上层的封装和实现

OSSpinLock(自旋锁)

自从 OSSpinLock 出现安全问题,在iOS10之后就被废弃了,自旋锁之所有不安全,是因为 获取锁后,线程会一直处于忙等待,造成了 任务的优先级反转

其中的忙等待机制可能会造成 高优先级任务一直runing等待,占用时间片,而 低优先级的任务 无法抢占时间片,会造成一直不能完成,锁未释放的情况

OSSpinLock 被废弃后,其替代方案是内部封装了 os_unfair_lock,而 os_unfair_lock 在加锁时会处于 休眠状态而不是自旋锁的忙等待状态

atomic(原子锁)

atomic 适用于OC中属性的修饰符,其 自带一把自旋锁,但是这个一般基本不使用,都是使用的 nonatomic

在前面的文章中,我们提及 setter 方法会根据修饰符调用不同的方法,其中最后会统一调用 reallySetProperty 方法,其中就有 atomic非atomic 的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static inline void reallySetProperty(id self, SEL _cmd, id newValue, ptrdiff_t offset, bool atomic, bool copy, bool mutableCopy)
{
...
id *slot = (id*) ((char*)self + offset);
...

if (!atomic) { // 未加锁
oldValue = *slot;
*slot = newValue;
} else { // 加锁
spinlock_t& slotlock = PropertyLocks[slot];
slotlock.lock();
oldValue = *slot;
*slot = newValue;
slotlock.unlock();
}
...
}

从源码中可以看出,对于 atomic 修饰的属性,进行了 spinlock_t 加锁处理,但是在前文中提到 OSSpinLock 已经废弃了,这里的 spinlock_t 在底层是通过 os_unfair_lock 替代了 OSSpinLock 实现的加锁。同时为了 防止哈希冲突,还是用了 加盐 操作

1
2
3
4
5
6
using spinlock_t = mutex_tt<LOCKDEBUG>;

class mutex_tt : nocopy_t {
os_unfair_lock mLock;
...
}

getter 方法中对 atomic 的处理,同 setter 是大致相同的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
id objc_getProperty(id self, SEL _cmd, ptrdiff_t offset, BOOL atomic) {
if (offset == 0) {
return object_getClass(self);
}

// Retain release world
id *slot = (id*) ((char*)self + offset);
if (!atomic) return *slot;

// Atomic retain release world
spinlock_t& slotlock = PropertyLocks[slot];
slotlock.lock(); // 加锁
id value = objc_retain(*slot);
slotlock.unlock(); // 解锁

// for performance, we (safely) issue the autorelease OUTSIDE of the spinlock.
return objc_autoreleaseReturnValue(value);
}

synchronized(互斥递归锁)

  • 开启汇编模式,发现 @synchronized 在执行过程中,会走底层的 objc_sync_enterobjc_sync_exit 方法

  • 也可以通过 clang,查看底层编译代码

  • 通过对 objc_sync_enter 方法符号断点,查看底层所在的源码库,通过断点发现在objc源码中,即 libpbjc.A.dylib

objc_sync_enter & objc_sync_exit 分析

  • 进入 objc_sync_enter 源码实现

    • 如果obj存在,则通过 id2data 方法获取响应的 SyncData,对 threadCount、lockCount 进行 递增 操作
    • 如果obj不存在,则调用 objc_sync_nil,通过符号断点得知,这个方法里面什么都没做,直接return了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;

if (obj) { // 传入不为nil
SyncData* data = id2data(obj, ACQUIRE); // 重点
ASSERT(data);
data->mutex.lock(); // 加锁
} else { // 传入nil
// @synchronized(nil) does nothing
if (DebugNilSync) {
_objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
}
objc_sync_nil();
}

return result;
}
  • 进入 objc_sync_exit 源码实现

    • 如果obj存在,则调用 id2data 方法获取对应的 SyncData,对 threadCount、lockCount 进行 递减 操作
    • 如果obj不存在,什么也不做
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// End synchronizing on 'obj'. 结束对“ obj”的同步
// Returns OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;

if (obj) {//obj不为nil
SyncData* data = id2data(obj, RELEASE);
if (!data) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
} else {
bool okay = data->mutex.tryUnlock();//解锁
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
} else {//obj为nil时,什么也不做
// @synchronized(nil) does nothing
}
return result;
}

通过上面两个实现逻辑的对比,发现它们有一个共同点,在obj存在时,都会通过 id2data 方法,获取 SyncData

  • 进入 SyncData 的定义,是一个结构体,主要用来表示一个 线程data,类似于 链表结构,有next指向,且封装了 recursive_mutex_t 属性,可以确认 @synchronized 确实是一个 递归互斥锁
1
2
3
4
5
6
typedef struct alignas(CacheLineSize) SyncData {
struct SyncData* nextData;
DisguisedPtr<objc_object> object; // 类似链表结构
int32_t threadCount; // number of THREADS using this block
recursive_mutex_t mutex; // 递归锁
} SyncData;
  • 进入 SyncCache 的定义,也是一个结构体,用于存储线程,其中 list[0] 表示 当前线程的链表data,主要用于存储 SyncDatalockCount
1
2
3
4
5
6
7
8
9
10
typedef struct {
SyncData *data;
unsigned int lockCount; // number of times THIS THREAD locked this block
} SyncCacheItem;

typedef struct SyncCache {
unsigned int allocated;
unsigned int used;
SyncCacheItem list[0];
} SyncCache;

id2data 分析

  • 进入 id2data 源码,从上面的分析,可以看出,这个方法是 加锁和解锁 都复用的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
static SyncData* id2data(id object, enum usage why)
{
spinlock_t *lockp = &LOCK_FOR_OBJ(object);
SyncData **listp = &LIST_FOR_OBJ(object);
SyncData* result = NULL;

#if SUPPORT_DIRECT_THREAD_KEYS //tls(Thread Local Storage,本地局部的线程缓存)
// Check per-thread single-entry fast cache for matching object
bool fastCacheOccupied = NO;
// 通过KVC方式对线程进行获取 线程绑定的data
SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
// 如果线程缓存中有data,执行if流程
if (data) {
fastCacheOccupied = YES;
// 如果在线程空间找到了data
if (data->object == object) {
// Found a match in fast cache.
uintptr_t lockCount;

result = data;
// 通过KVC获取lockCount,lockCount用来记录 被锁了几次,即 该锁可嵌套
lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
if (result->threadCount <= 0 || lockCount <= 0) {
_objc_fatal("id2data fastcache is buggy");
}

switch(why) {
case ACQUIRE: {
// objc_sync_enter走这里,传入的是ACQUIRE -- 获取
lockCount++; // 通过lockCount判断被锁了几次,即表示 可重入(递归锁如果可重入,会死锁)
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);//设置
break;
}
case RELEASE:
// objc_sync_exit走这里,传入的why是RELEASE -- 释放
lockCount--;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
if (lockCount == 0) {
// remove from fast cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}

return result;
}
}
#endif

// Check per-thread cache of already-owned locks for matching object
SyncCache *cache = fetch_cache(NO);// 判断缓存中是否有该线程
// 如果cache中有,方式与线程缓存一致
if (cache) {
unsigned int i;
for (i = 0; i < cache->used; i++) { // 遍历总表
SyncCacheItem *item = &cache->list[i];
if (item->data->object != object) continue;

// Found a match.
result = item->data;
if (result->threadCount <= 0 || item->lockCount <= 0) {
_objc_fatal("id2data cache is buggy");
}

switch(why) {
case ACQUIRE: // 加锁
item->lockCount++;
break;
case RELEASE: // 解锁
item->lockCount--;
if (item->lockCount == 0) {
// remove from per-thread cache 从cache中清除使用标记
cache->list[i] = cache->list[--cache->used];
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}

return result;
}
}

// Thread cache didn't find anything.
// Walk in-use list looking for matching object
// Spinlock prevents multiple threads from creating multiple
// locks for the same new object.
// We could keep the nodes in some hash table if we find that there are
// more than 20 or so distinct locks active, but we don't do that now.
// 第一次进来,所有缓存都找不到
lockp->lock();
{
SyncData* p;
SyncData* firstUnused = NULL;
for (p = *listp; p != NULL; p = p->nextData) {//cache中已经找到
if ( p->object == object ) {//如果不等于空,且与object相似
result = p;//赋值
// atomic because may collide with concurrent RELEASE
OSAtomicIncrement32Barrier(&result->threadCount);//对threadCount进行++
goto done;
}
if ( (firstUnused == NULL) && (p->threadCount == 0) )
firstUnused = p;
}

// no SyncData currently associated with object 没有与当前对象关联的SyncData
if ( (why == RELEASE) || (why == CHECK) )
goto done;

// an unused one was found, use it 第一次进来,没有找到
if ( firstUnused != NULL ) {
result = firstUnused;
result->object = (objc_object *)object;
result->threadCount = 1;
goto done;
}
}

// Allocate a new SyncData and add to list.
// XXX allocating memory with a global lock held is bad practice,
// might be worth releasing the lock, allocating, and searching again.
// But since we never free these guys we won't be stuck in allocation very often.
posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));//创建赋值
result->object = (objc_object *)object;
result->threadCount = 1;
new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
result->nextData = *listp;
*listp = result;

done:
lockp->unlock();
if (result) {
// Only new ACQUIRE should get here.
// All RELEASE and CHECK and recursive ACQUIRE are
// handled by the per-thread caches above.
if (why == RELEASE) {
// Probably some thread is incorrectly exiting
// while the object is held by another thread.
return nil;
}
if (why != ACQUIRE) _objc_fatal("id2data is buggy");
if (result->object != object) _objc_fatal("id2data is buggy");

#if SUPPORT_DIRECT_THREAD_KEYS
if (!fastCacheOccupied) { //判断是否支持栈存缓存,支持则通过KVC形式赋值 存入tls
// Save in fast thread cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);//lockCount = 1
} else
#endif
{
// Save in thread cache 缓存中存一份
if (!cache) cache = fetch_cache(YES);//第一次存储时,对线程进行了绑定
cache->list[cache->used].data = result;
cache->list[cache->used].lockCount = 1;
cache->used++;
}
}

return result;
}
  • 【第一步】首页在 tls线程缓存 中查找

    • tls_get_direct 方法中以 线程为key,通过 KVC 的方式获取与之绑定的 SyncData,即线程 data,其中的 tls(),表示 本地局部的线程缓存

    • 判断获取的data是否存在,以及判断data中是否能找到对应的objct

    • 如果都找到了,在 tls_get_direct 方法中以KVC的方式获取 lockcount,用来记录 对象被锁了几次(即锁的嵌套次数)

    • 如果data中的 threadCount 小于等于0,或者 lockCount 小于等于0时,则直接崩溃

    • 通过传入的 why,判断操作类型

      • 如果是 ACQUIRE,表示加锁,则进行lockCount++,并保存到tls缓存
      • 如果是 RELEASE,表示释放,则进行lockCount++,并保存到tls缓存,如果 lockCount 等于 0,从tls中 移除 线程data
      • 如果是 CHECK,则什么也不做
    • 【第二步】如果 tls 中没有,则在 cache缓存 中查找

      • 通过 fetch_cache 方法查找cache缓存中是否有线程
      • 如果有,则遍历cache总表,读取出线程对应的 SyncCacheItem
      • SyncCacheItem 中取出 data,然后后续步骤与tls的匹配是一致的
    • 【第三步】如果cache中也没有,即 第一次进来,则创建 SyncData,并存储到相应缓存中

      • 如果在cache中找到线程,且与object相等,则进行 赋值、以及 threadCount++
      • 如果在cache中没有找到,则 threadCount 等于 1.

    所以在 id2data 方法中,主要分为三种情况

    • 【第一次进来,没有锁】

      • threadCount = 1
      • lockCount = 1
      • 存储到 tls
    • 【不是第一次进来,且是同一个线程】

      • tls中有数据,则 lockCount++
      • 存储到 tls
    • 【不是第一次进来,且是不同线程】

      • 全局线程空间 进行查找线程
      • threadCount++
      • lockCount++
      • 存储到 cache

tls和cache表结构

针对tls和cache缓存,底层的表结构如下所示

  • 哈希表 结构中通过 SyncList 结构来组装 多线程 的情况
  • SyncData 通过 链表 的形式组装 当前可重入 的情况
  • 下层通过 tls线程缓存、cache缓存 来进行 处理
  • 底层主要有两个东西:lockCount、threadCount,解决了递归互斥锁,解决了嵌套可重入

@synchronized 坑点

下面代码这样写,会有什么问题?

1
2
3
4
5
6
7
8
9
10
 - (void)zj_testSync{
_testArray = [NSMutableArray array];
for (int i = 0; i < 200000; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
@synchronized (self.testArray) {
self.testArray = [NSMutableArray array];
}
});
}
}

运行结果发现,运行就崩溃

崩溃的主要原因是 testArray 在某一瞬间变成了 nil,从 @synchronized 底层流程可知,如果 加锁的对象成了nil,是锁不住的,相当于下面这种情况,block内部不停的retain、release,会在某一瞬间 上一个未release下一个已经准备release,这样会导致 野指针 的产生

1
2
3
4
5
6
7
8
- (void)zj_crash{
_testArray = [NSMutableArray array];
for (int i = 0; i < 200000; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
_testArray = [NSMutableArray array];
});
}
}

可以根据上面的代码,打开 edit scheme -> run -> Diagnostics 中勾选 Zombie Objects,来查看是否存在 僵尸对象,结果如下所示

我们一般使用 @synchronized(self),主要是因为 _testArray 的持有者是 self

注意:野指针 vs 过度释放

  • 野指针:是指由于过度释放产生的指针还在进行操作

  • 过度释放:每次都会 retainrelease

总结

  • @synchronized 在底层封装的是一把 递归锁,所以这个锁是 递归互斥锁

  • @synchronized 的可重入,即可 嵌套,主要是由于 lockCountthreadCount 的搭配

  • @synchronized 使用 链表 的原因是 链表方便下一个data的插入

  • 但是由于底层中 链表查询、缓存的查找以及递归,是非常 耗内存 以及 性能 的,导致 性能低,所以在前文中,该锁的排名在最后

  • 不能使用 非OC对象 作为加锁对象,因为其 object 的参数为 id

  • @synchronized(self) 这种适用于 嵌套次数较少 的场景。这里锁住的对象也 并不永远是self,这里需要读者注意

  • 如果锁嵌套次数较多,即 锁self过多,会导致底层的查找非常麻烦,因为其底层是链表进行查找,所以会相对比较麻烦,所以此时可以使用 NSLock、信号量

NSLock

NSLock 是对 下层phread_mutex 的封装,使用如下:

1
2
3
NSLock *lock = [[NSLock alloc] init];
[lock lock];
[lock unlock];

直接进入 NSLock 定义查看,其遵循了 NSLocking 协议,下面来探索 NSLock 的底层实现

NSLock底层分析

  • 通过加符号断点 lock 分析,发现其源码在 Foundation 框架中

  • 由于OC的 Foundation 框架不开源,所以这里借助 Swift 的开源框架 Foundation 来分析 NSLock 的底层实现,其原理与OC是大致相同的

通过源码实现可以看出,底层是通过 pthread_mutex 互斥锁实现的,并且在init方法中,还做了一些其他操作,所以在使用 NSLock 时需要使用 init 初始化

回到前文的性能图中,可以看出 NSLock 的性能仅次于 pthread_mutex(互斥锁),非常接近

使用弊端

请问下面block嵌套block的代码中,会有什么问题?

1
2
3
4
5
6
7
8
9
10
11
12
for (int i= 0; i<100; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
static void (^testMethod)(int);
testMethod = ^(int value){
if (value > 0) {
NSLog(@"current value = %d",value);
testMethod(value - 1);
}
};
testMethod(10);
});
}
  • 在未加锁之前,其中current打印的结果数据混乱,主要原因是多线程导致的

  • 如果像下面这样加锁,会有什么问题?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
NSLock * lock = [[NSLock alloc] init];
for (int i= 0; i<100; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
static void (^testMethod)(int);
testMethod = ^(int value){
[lock lock];
if (value > 0) {
NSLog(@"current value = %d",value);
testMethod(value - 1);
}
};
testMethod(10);
[lock unlock];
});
}

会出现一直等待的情况,主要是因为 嵌套使用的递归,使用 NSLock(简单的互斥锁,如果没有回来,会一直睡觉等待),即会存在一直 加lock,等不到 unlock 的堵塞情况

所以,针对这种情况,可以使用以下方式解决

  • 使用 NSLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
NSLock * lock = [[NSLock alloc] init];
for (int i= 0; i<100; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
static void (^testMethod)(int);
[lock lock];
testMethod = ^(int value){
if (value > 0) {
NSLog(@"current value = %d",value);
testMethod(value - 1);
}else {
[lock unlock];
}
};
testMethod(10);
});
}
  • 使用 @synchronized
1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (int i= 0; i<100; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
static void (^testMethod)(int);
testMethod = ^(int value){
@synchronized (self) {
if (value > 0) {
NSLog(@"current value = %d",value);
testMethod(value - 1);
}
}
};
testMethod(10);
});
}
  • 使用递归锁 NSRecursiveLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
NSRecursiveLock * lock = [[NSRecursiveLock alloc] init];
for (int i= 0; i<100; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
static void (^testMethod)(int);
[lock lock]; // 加锁
testMethod = ^(int value){
if (value > 0) {
NSLog(@"current value = %d",value);
testMethod(value - 1);
}
[lock unlock];
};
testMethod(10);
});
}

pthread_mutex

pthread_mutex 就是 互斥锁本身,当锁被占用,其他线程申请锁时,不会一直忙等待,而是 阻塞线程并睡眠

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 导入头文件
#import <tphread.j>

// 全局声明互斥锁
phread_mutex_t _lock;

// 初始化互斥锁
pthread_mutex_init(&_lock, NULL);

// 加锁
pthread_mutex_lock(&_lock);

// 这里需要线程安全操作
// 解锁
pthread_mutex_unlock(&_lock);

// 释放锁
pthread_mutex_destroy(&_lock);

NSRecursiveLock

NSRecursiveLock 在底层也是对 pthread_mutex 的封装,可以通过 swiftFoundation 源码查看

对比 NSLockNSRecursiveLock,其底层实现几乎一模一样,区别在于init时,NSRecursiveLock 有一个标识 PTHREAD_MUTEX_RECURSIVE ,而 NSLock 是默认的

递归锁 主要是用于 解决一些嵌套形式,其中循环嵌套居多

NSCondition

NSCondition 是一个 条件锁,在日常开发中使用较少,与信号量有点相似:线程1 需要满足条件才会往下走,否则会 堵塞等待,直到条件满足,经典模型是 生产消费者模型

NSCondition的对象 实际上作为一个 和一个 线程检查器

  • 主要为了 当检测条件时保护数据源执行条件引发的任务
  • 线程检查器 主要是 根据条件决定是否继续运行线程,即线程是否被 阻塞

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 初始化
NSCondition *condition = [[NSCondition alloc] init]

// 一般用于多线程同时访问、修改同一个数据源,保证在同一时间内数据源只被访问、修改一次,其他线程的命令需要在lock 外等待,只到 unlock ,才可访问
[condition lock];

// 与lock 同时使用
[condition unlock];

// 让当前线程处于等待状态
[condition wait];

// CPU发信号告诉线程不用在等待,可以继续执行
[condition signal];

底层分析

通过swift的Foundation源码查看 NSCondition 的底层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
open class NSCondition: NSObject, NSLocking {
internal var mutex = _MutexPointer.allocate(capacity: 1)
internal var cond = _ConditionVariablePointer.allocate(capacity: 1)
// 初始化
public override init() {
pthread_mutex_init(mutex, nil)
pthread_cond_init(cond, nil)
}
// 析构
deinit {
pthread_mutex_destroy(mutex)
pthread_cond_destroy(cond)

mutex.deinitialize(count: 1)
cond.deinitialize(count: 1)
mutex.deallocate()
cond.deallocate()
}
// 加锁
open func lock() {
pthread_mutex_lock(mutex)
}
// 解锁
open func unlock() {
pthread_mutex_unlock(mutex)
}
// 等待
open func wait() {
pthread_cond_wait(cond, mutex)
}
// 等待
open func wait(until limit: Date) -> Bool {
guard var timeout = timeSpecFrom(date: limit) else {
return false
}
return pthread_cond_timedwait(cond, mutex, &timeout) == 0
}
// 信号,表示等待的可以执行了
open func signal() {
pthread_cond_signal(cond)
}
// 广播
open func broadcast() {
// 汇编分析 - 猜 (多看多玩)
pthread_cond_broadcast(cond) // wait signal
}
open var name: String?
}

其底层也是对下层 pthread_mutex 的封装

  • NSCondition 是对 mutexcond 的一种封装(cond就是用于访问和操作特定类型数据的指针)

  • wait 操作会 阻塞线程,使其进入 休眠状态,直至超时

  • signal 操作是 唤醒 一个正在休眠等待的线程

  • broadcast 会唤醒所有在等待的线程

NSConditionLock

NSConditionLock 是条件锁,一旦一个线程获取锁,其他线程一定等待

相比 NSConditionLock 而言,NSCondition 使用比较麻烦,所以推荐使用 NSConditionLock ,其使用如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 初始化
NSConditionLock * conditionLock = [[NSConditionLock alloc] initWithCondition:2];

// 表示 conditionLock 期待获得锁,如果没有其他线程获得锁(不需要判断内部的 condition) 那它能执行此行以下代码,如果已经有其他线程获得锁(可能是条件锁,或者无条件锁),则等待,直至其他线程解锁
[conditionLock lock];

// 表示如果没有其他线程获得该锁,但是该锁内部的 condition 不等于A条件,它依然不能获得锁,仍然等待。如果内部的condition等于A条件,并且没有其他线程获得该锁,则进入代码区,同时设置它获得该锁,其他任何线程都将等待它代码的 完成,直至它解锁。
[conditionLock lockWhenCondition:A条件];

// 表示释放锁,同时把内部的condition设置为A条件
[conditionLock unlockWithCondition:A条件];

// 表示如果被锁定(没获得 锁),并超过该时间则不再阻塞线程。但是注意:返回的值是NO,它没有改变锁的状态,这个函 数的目的在于可以实现两种状态下的处理
return = [conditionLock lockWhenCondition:A条件 beforeDate:A时间];

// 其中所谓的condition就是整数,内部通过整数比较条件

NSConditionLock ,其本质就是 NSCondition + Lock,以下是其swift的底层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
open class NSConditionLock : NSObject, NSLocking {
internal var _cond = NSCondition()
internal var _value: Int
internal var _thread: _swift_CFThreadRef?

public convenience override init() {
self.init(condition: 0)
}

public init(condition: Int) {
_value = condition
}

open func lock() {
let _ = lock(before: Date.distantFuture)
}

open func unlock() {
_cond.lock()
_thread = nil
_cond.broadcast()
_cond.unlock()
}

open var condition: Int {
return _value
}

open func lock(whenCondition condition: Int) {
let _ = lock(whenCondition: condition, before: Date.distantFuture)
}

open func `try`() -> Bool {
return lock(before: Date.distantPast)
}

open func tryLock(whenCondition condition: Int) -> Bool {
return lock(whenCondition: condition, before: Date.distantPast)
}

open func unlock(withCondition condition: Int) {
_cond.lock()
_thread = nil
_value = condition
_cond.broadcast()
_cond.unlock()
}

open func lock(before limit: Date) -> Bool {
_cond.lock()
while _thread != nil {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}

open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
_cond.lock()
while _thread != nil || _value != condition {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}

open var name: String?
}

通过源码可以看出

  • NSConditionLockNSCondition 的封装
  • NSConditionLock 可以 设值锁条件,即 NSCondition值,而 NSCondition只是信号的通知

调试验证

以下面的代码为例,调试 NSConditionLock 底层流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- (void)zj_testConditonLock{
// 信号量
NSConditionLock * conditionLock = [[NSConditionLock alloc] initWithCondition:2];
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
[conditionLock lockWhenCondition:1]; // conditoion = 1 内部 Condition 匹配
NSLog(@"线程 1");
[conditionLock unlockWithCondition:0];
});

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
[conditionLock lockWhenCondition:2];
sleep(0.1);
NSLog(@"线程 2");
[conditionLock unlockWithCondition:1]; // _value = 2 -> 1
});

dispatch_async(dispatch_get_global_queue(0, 0), ^{
[conditionLock lock];
NSLog(@"线程 3");
[conditionLock unlock];
});
}

查看打印结果

打印结果分析

  • 线程1 调用 [conditionLock lockWhenCondition:1],此时因为不满足条件,所以会 进入waiting状态,当进入到waiting时,会释放当前的互斥锁

  • 此时当前的 线程3 调用 [conditionLock lock],本质上是调用 [NSConditionLock lockBeforeDate:],这里 不需要对比条件值,所以 线程3先打印

  • 接下来 线程2 执行 [conditionLock lockWhenCondition:2],因为 满足条件值,所以 线程2会打印,打印完成后调用 [conditionLock unlockWithCondition:1],这个时候将value设置为 1,并发送 boradcast,此时 线程1 接收到当前的信号,唤醒执行并打印

  • 自此当前打因为:线程3->线程2->线程1

  • [conditionLock lockWhenCondition:] 这里会根据传入的 condition值Value值 进行对比,如果 不相等,这里 就会阻塞,进入线程池,否则的话就继续代码执行 [conditionLock unlockWithCondition:1] ,这里会 先更改当前的value值,然后进行尝试。然后 进行广播,唤醒当前的线程

性能总结

  • OSSpinLock自旋锁 由于安全性问题,在iOS10之后已经被废弃,其底层的实现用 os_unfair_lock 替代

    • 使用 OSSpinLock 及所示,会处于 忙等待状态
    • os_unfair_lock 是处于 休眠状态
  • atomic原子锁 自带一把自旋锁,只能保证 setter、getter 时的线程安全,在日常开发中使用更多的还是 nonatomic 修饰属性

    • atomic:当属性在调用 setter、getter 方法时,会加上 自旋锁OSSpinLock,用于保证同一时刻只能有一个线程调用属性的读或写,避免了属性读写不同步的问题,由于是底层编译器自动生成的互斥锁代码,会导致效率相对较低

    • nonatomic:当属性在调用 setter、getter 方法时,不会加上自旋锁,即 线程不安全,由于编译器不会自动生成互斥锁代码,可以 提高效率

  • @synchronized 在底层维护了一个 哈希表 进行线程data的存储,通过 链表 表示 可重入(即嵌套)的特性,虽然性能较低,但由于简单好用,使用频率高

  • NSLock、NSRecursiveLock 底层是对 pthread_mutex 的封装

  • NSConditionNSConditionLock 是条件锁,底层都是对 pthread_mutex 的封装,当满足某一条件时才能进行操作,和信号量 dispatch_semphore 类似

锁的使用场景

  • 如果只是 简单 的使用,例如涉及线程安全,使用 NSLock 即可。

  • 如果是 循环嵌套,推荐使用 @synchronized ,主要是因为使用 递归锁 的性能不如使用 @synchronized 的性能(因为在 @synchronized 中无论怎么重入,都没有关系,而 NSRecursiveLock 可能会出现崩溃现象)

  • 循环嵌套 中,如果 递归锁 掌握的很好,则建议使用 递归锁,因为性能好

  • 如果是 循环嵌套,并且还有 多线程影响,例如有 等待、死锁现象 时,建议 使用 @synchronized

  • Post title:OC底层原理29:锁的原理
  • Post author:张建
  • Create time:2021-04-10 14:46:18
  • Post link:https://redefine.ohevan.com/2021/04/10/OC底层原理/OC底层原理29:锁的原理/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.