前言

一、锁的初识

二、@synchronized

三、pthread_mutex

四、NSLock

五、NSCondition

六、NSConditionLock

七、NSConditionLock底层分析

八、读写锁


多线程前面已经总结了NSThread、GCD和NSOperationQueue等。接下来重点探索总结锁的相关知识点。


一、锁的初识

1、锁的性能数据:


由上图可以看到:
(1)性能最高的锁是自旋锁,但是苹果废弃了自旋锁,原因就是会产生线程的优先级反转问题(自旋锁不会记录当前线程的信息,无法通过优先级继承或者优先级天花板等方法解决优先级反转问题),这个之前在多线程的博客中有具体说明。
(2)性能最差的锁是@synchronized,这个后面具体分析其底层原理。
(3)排名第二的信号量,在GCD的博客中也具体的分析过了。
(4)其他的锁后面具体分析。

2、锁的归类

(1)自旋锁:线程反复检查锁变量是否可用。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。自旋锁避免了进程上下文的调度开销,因此对于线程只会阻塞很短时间的场合是有效的。
(2)互斥锁:是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区而达成。互斥锁有NSLock、pthread_mutex、@synchronized。
(3)条件锁:就是条件变量,当进程的某些资源要求不满⾜时就进⼊休眠,也就是锁住了。当资源被分配到了,条件锁打开,进程继续运⾏。条件锁有:NSCondition和NSConditionLock
(4)递归锁:就是同⼀个线程可以加锁N次⽽不会引发死锁。递归锁有:NSRecursiveLock和pthread_mutex(recursive)
(5)信号量(semaphore):是⼀种更⾼级的同步机制,互斥锁可以说是semaphore在仅取值0/1时的特例。信号量可以有更多的取值空间,⽤来实现更加复杂的同步,⽽不单单是线程间互斥。
锁基本上就两大类:自旋锁和互斥锁,也可以说有三类:自旋锁、互斥锁、读写锁。其他都是基于它们的上层封装。

3、卖票案例

卖票的案例可以说非常的经典了,这里在具体的总结下:

- (void)testSaleTicket {
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        for (int i = 0; i < 5; i++) {
            [self saleTicket];
        }
    });
    
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        for (int i = 0; i < 5; i++) {
            [self saleTicket];
        }
    });
    
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        for (int i = 0; i < 3; i++) {
            [self saleTicket];
        }
    });
    
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        for (int i = 0; i < 10; i++) {
            [self saleTicket];
        }
    });
}

- (void)saleTicket {
    if (self.ticketCount > 0) {
        self.ticketCount--;
        sleep(0.1);
        NSLog(@"当前余票还剩:%ld张",self.ticketCount);
        
    }else{
        NSLog(@"当前车票已售罄");
    }
}

以上代码在不加锁的情况下,打印结果是无序的。

当我们加入@synchronized时,线程就安全了。

- (void)saleTicket {
    @synchronized (self) {
        if (self.ticketCount > 0) {
            self.ticketCount--;
            sleep(0.1);
            NSLog(@"当前余票还剩:%ld张",self.ticketCount);
            
        }else{
            NSLog(@"当前车票已售罄");
        }
    }
}

二、@synchronized

1、初步分析

通过断点进入汇编可以看到:


分别追加了objc_sync_enter和objc_sync_exit两个函数。
继续通过xcrun看下c++的代码:

xcrun -sdk iphonesimulator clang -arch arm64 -rewrite-objc main.m -o main.cpp
int main(int argc, char * argv[]) {
    NSString * appDelegateClassName;
    /* @autoreleasepool */ { __AtAutoreleasePool __autoreleasepool; 
        appDelegateClassName = NSStringFromClass(((Class (*)(id, SEL))(void *)objc_msgSend)((id)objc_getClass("AppDelegate"), sel_registerName("class")));
        {
            id _rethrow = 0;
            id _sync_obj = (id)appDelegateClassName;
            objc_sync_enter(_sync_obj);
            try {
                struct _SYNC_EXIT {
                    _SYNC_EXIT(id arg) : sync_exit(arg) {}
                    ~_SYNC_EXIT() {
                        objc_sync_exit(sync_exit);}
                        id sync_exit;
                }
                _sync_exit(_sync_obj);
                
            } catch (id e) {
                _rethrow = e;
                
            }
        
            { struct _FIN { _FIN(id reth) : rethrow(reth) {}
                ~_FIN() { if (rethrow) objc_exception_throw(rethrow); }
                id rethrow;
            } _fin_force_rethow(_rethrow);}
        }
        
    }
    return UIApplicationMain(argc, argv, __null, appDelegateClassName);
}

可以看到,@synchronized确实会转化成objc_sync_enter 和 objc_sync_exit(_SYNC_EXIT析构之后就会走objc_sync_exit)。

2、源码分析

直接进入objc的源码分析。

// Begin synchronizing on 'obj'. 
// Allocates recursive mutex associated with 'obj' if needed.
// Returns OBJC_SYNC_SUCCESS once lock is acquired.  
int objc_sync_enter(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    if (obj) {
        SyncData* data = id2data(obj, ACQUIRE);
        ASSERT(data);
        data->mutex.lock();
    } else {
        // @synchronized(nil) does nothing
        if (DebugNilSync) {
            _objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
        }
        objc_sync_nil();
    }
    return result;
}

int objc_sync_exit(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    
    if (obj) {
        SyncData* data = id2data(obj, RELEASE); 
        if (!data) {
            result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
        } else {
            bool okay = data->mutex.tryUnlock();
            if (!okay) {
                result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
            }
        }
    } else {
        // @synchronized(nil) does nothing
    }
    return result;
}

objc_sync_enter函数注释可以总结@synchronized的特性:

@synchronized 是一种递归的互斥锁,因为是递归互斥锁,所以可以嵌套使用。

对于线程访问来说:
(1)同一个线程可以重复锁。
(2)多个线程也可以操作。

  • <1>进入objc_sync_enter函数,首先判断obj是否存在,如果不存在就走objc_sync_nil(),不做处理。
#   define BREAKPOINT_FUNCTION(prototype)                             \
    OBJC_EXTERN __attribute__((noinline, used, visibility("hidden"))) \
    prototype { asm(""); }

BREAKPOINT_FUNCTION(
    void objc_sync_nil(void)
);

这里也注意一个坑点:

for (int i = 0; i < 200000; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            @synchronized(_testArray) {
                _testArray = [NSMutableArray new];
            }
        });
}

以上代码仍然会发生崩溃,因为_testArray在某一个临界点会是nil,当nil的时候是不会加锁的,所以会崩溃。

如果存在就会走id2data函数返回一个SyncData,然后加锁。

  • <2>进入objc_sync_exit函数,同样的会先判断objc是否存在,如果不存在就不处理,如果存在就通过id2data函数获取SyncData,然后解锁。

3、SyncData结构

由上面的分析,我们就可以清楚的知道加锁和解锁的过程,继续重点分析SyncData和id2data函数。

typedef struct alignas(CacheLineSize) SyncData {
    struct SyncData* nextData;
    DisguisedPtr<objc_object> object;
    int32_t threadCount;  // number of THREADS using this block
    recursive_mutex_t mutex;
} SyncData;

SyncData是一个单向链表结构,其中记录了:

  • 下一个元素
  • 包装的objc
  • 访问线程数
  • 递归互斥锁。

3、recursive_mutex_t

using recursive_mutex_t = recursive_mutex_tt<LOCKDEBUG>;
class recursive_mutex_tt : nocopy_t {
    os_unfair_recursive_lock mLock;

  public:
    constexpr recursive_mutex_tt() : mLock(OS_UNFAIR_RECURSIVE_LOCK_INIT) {
        lockdebug_remember_recursive_mutex(this);
    }

    constexpr recursive_mutex_tt(__unused const fork_unsafe_lock_t unsafe)
        : mLock(OS_UNFAIR_RECURSIVE_LOCK_INIT)
    { }

    void lock()
    {
        lockdebug_recursive_mutex_lock(this);
        os_unfair_recursive_lock_lock(&mLock);
    }

    void unlock()
    {
        lockdebug_recursive_mutex_unlock(this);

        os_unfair_recursive_lock_unlock(&mLock);
    }
};

可以看到recursive_mutex_t底层是os_unfair_recursive_lock,在之前的objc版本中,SyncData中的锁是pthread_mutex。
借鉴一下其他人的博客说明os_unfair_lock:

os_unfair_lock是一种底层锁,用于取代OSSpinLock,尝试获取已加锁的线程无需忙等,解锁时由内核唤醒。
和OSSpinLock一样,os_unfair_lock也没有加强公平性和顺序。
例如,释放锁的线程可能立即再次加锁,而之前等待锁的线程唤醒后没有机会尝试加锁。
这样有利于提高性能,但也造成了饥饿(starvation)。
Starvation 指贪婪线程占用共享资源太长时间,其他线程无法访问共享资源、无法取得进展。
例如,某对象的同步方法占用时间过长,并且频繁调用,其他线程尝试调用该方法时会被堵塞,处于 starvation。
注意:该锁包含线程信息,系统可用于解决优先级反转问题。

使用如下:

//初始化
os_unfair_lock _unfairLock = OS_UNFAIR_LOCK_INIT;
//加锁
os_unfair_lock_lock(&_unfairLock);
//解锁
os_unfair_lock_unlock(&_unfairLock);

对于os_unfair_recursive_lock而言,它是非公平递归锁,iOS12开始支持,但目前是私有,不对开发者公开。

4、存储结构

它会被存放在StripedMap这个哈希表中

#define LOCK_FOR_OBJ(obj) sDataLists[obj].lock
#define LIST_FOR_OBJ(obj) sDataLists[obj].data
static StripedMap<SyncList> sDataLists;

class StripedMap {
#if TARGET_OS_IPHONE && !TARGET_OS_SIMULATOR
    // 真机环境下hash表大小只有8
    enum { StripeCount = 8 };
#else
    enum { StripeCount = 64 };
#endif

    struct PaddedT {
        T value alignas(CacheLineSize);
    };

    PaddedT array[StripeCount];
    //哈希函数
    static unsigned int indexForPointer(const void *p) {
        uintptr_t addr = reinterpret_cast<uintptr_t>(p);
        return ((addr >> 4) ^ (addr >> 9)) % StripeCount;
    }
    //......

每一个key对应的是一个SyncList

struct SyncList {
    SyncData *data;
    spinlock_t lock;
    constexpr SyncList() : data(nil), lock(fork_unsafe_lock) { }
};

整个数据结构就如下图:


5、id2data函数

先大体看下id2data函数:

继续具体分析:

#if SUPPORT_DIRECT_THREAD_KEYS
 //查找当前线程是否有SyncData
    // Check per-thread single-entry fast cache for matching object
    bool fastCacheOccupied = NO;
    SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
    if (data) {
        fastCacheOccupied = YES;

        if (data->object == object) {
            // Found a match in fast cache.
            uintptr_t lockCount;
            result = data;
            lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
            //...
            switch(why) {
            case ACQUIRE: {
                lockCount++;
                tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
                break;
            }
            case RELEASE:
                lockCount--;
                tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
                if (lockCount == 0) {
                    // remove from fast cache
                    tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
                    // atomic because may collide with concurrent ACQUIRE
                    OSAtomicDecrement32Barrier(&result->threadCount);
                }
                break;
            case CHECK:
                // do nothing
                break;
            }
            return result;
        }
    }
#endif

首先如果从tls找的了data,继续从tls获取lockCount,然后判断操作:ACQUIRE和RELEASE。
如果是ACQUIRE,就对lockCount++,如果RELEASE就对lockCount--。
也就是通过lockCount来判断,被锁了多少次,也就证明了@synchronized可以被嵌套使用(可重入(重复被锁))。

接着看SyncCache:

//检查已拥有锁的每个线程(app的所有线程)缓存是否匹配对象
    // Check per-thread cache of already-owned locks for matching object
    SyncCache *cache = fetch_cache(NO);
    if (cache) {
        unsigned int i;
        for (i = 0; i < cache->used; i++) {
            SyncCacheItem *item = &cache->list[i];
            if (item->data->object != object) continue;

            // Found a match.
            result = item->data;
            if (result->threadCount <= 0  ||  item->lockCount <= 0) {
                _objc_fatal("id2data cache is buggy");
            }
                
            switch(why) {
            case ACQUIRE:
                item->lockCount++;
                break;
            case RELEASE:
                item->lockCount--;
                if (item->lockCount == 0) {
                    // remove from per-thread cache
                    cache->list[i] = cache->list[--cache->used];
                    // atomic because may collide with concurrent ACQUIRE
                    OSAtomicDecrement32Barrier(&result->threadCount);
                }
                break;
            case CHECK:
                // do nothing
                break;
            }

            return result;
        }
    }

SyncCache是从已拥有锁的每个线程(app所有的线程)缓存查找,也是使用了tls,但是_objc_pthread_key是一个全局的key。

_objc_pthread_data *_objc_fetch_pthread_data(bool create)
{
    _objc_pthread_data *data;

    data = (_objc_pthread_data *)tls_get(_objc_pthread_key);
    if (!data  &&  create) {
        data = (_objc_pthread_data *)
            calloc(1, sizeof(_objc_pthread_data));
        tls_set(_objc_pthread_key, data);
    }

    return data;
}

当SyncChache也没有找到的时候,就从总链接去查找:

{
        //从总链表去查找
        SyncData* p;
        SyncData* firstUnused = NULL;
        for (p = *listp; p != NULL; p = p->nextData) {
            if ( p->object == object ) {
                result = p;
                // atomic because may collide with concurrent RELEASE
                OSAtomicIncrement32Barrier(&result->threadCount);
                goto done;
            }
            if ( (firstUnused == NULL) && (p->threadCount == 0) )
                firstUnused = p;
        }
    
        //没有当前与对象关联的SyncData
        // no SyncData currently associated with object
        if ( (why == RELEASE) || (why == CHECK) )
            goto done;
    
        //发现N个未使用的,使用它
        // an unused one was found, use it
        if ( firstUnused != NULL ) {
            result = firstUnused;
            result->object = (objc_object *)object;
            result->threadCount = 1;
            goto done;
        }
    }

当总链表也没有的时候,就创建一个新的SyncData

//如果都没有找到就创建一个
    // Allocate a new SyncData and add to list.
    // XXX allocating memory with a global lock held is bad practice,
    // might be worth releasing the lock, allocating, and searching again.
    // But since we never free these guys we won't be stuck in allocation very often.
    posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
    result->object = (objc_object *)object;
    result->threadCount = 1;
    new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
    result->nextData = *listp;
    *listp = result;

最后会刷新一下所有缓存:

done:
    lockp->unlock();
    if (result) {
        // Only new ACQUIRE should get here.
        // All RELEASE and CHECK and recursive ACQUIRE are 
        // handled by the per-thread caches above.
        if (why == RELEASE) {
            // Probably some thread is incorrectly exiting 
            // while the object is held by another thread.
            return nil;
        }
        if (why != ACQUIRE) _objc_fatal("id2data is buggy");
        if (result->object != object) _objc_fatal("id2data is buggy");

#if SUPPORT_DIRECT_THREAD_KEYS
        if (!fastCacheOccupied) {
            // Save in fast thread cache
            tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
            tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
        } else 
#endif
        {
            // Save in thread cache
            if (!cache) cache = fetch_cache(YES);
            cache->list[cache->used].data = result;
            cache->list[cache->used].lockCount = 1;
            cache->used++;
        }
    }

那么对于多线程而言,每一个线程都可以锁多个对象(通过SyncData的链表),每一个对象又可以锁多次(通过lockCount)。

6、加锁流程总结

threadCount、lockCount

(1)第一次进来没有锁的时候:threadCount=1,lockCount=1,刷新缓存。
(2)同一个线程,不是第一次的时候:lockCount++,刷新缓存。
(3)不同线程,不是第一次的时候,threadCount++,lockCount++,刷新缓存。

整体获取SyncData的流程如下:

(1)获取当前线程的缓存,查看是否是当前要加锁的对象,如果是则lockCount++,更新缓存并结束流程,如果不是走第二步。
(2)获取拥有锁的所有线程缓存,查看是否存在当前要加锁对象,如果存在则lockCount++,更新缓存并结束流程。如果不存在走第三步。
(3)查找总链表,看是否存在当前要加锁的对象,存在则goto done。如果只找到未使用的,则使用它,然后goto done。如果都不存在,则去创建一个新的SyncData。

7、解锁流程总结

(1)获取当前线程的缓存,查看是否是当前要加锁的对象,如果存在,则执行lockCount--,如果当前的lockCount0,则threadCount--,更新缓存并结束流程。如果不存在走第二步(一般情况下,都会存在)
(2)获取拥有锁的所有线程缓存,查看是否存在当前要加锁对象,如果存在则执行lockCount--,如果当前的lockCount
0,则threadCount--,更新缓存并结束流程。

8、 @synchronized总结:

(1)@synchronized是一把支持多线程递归的互斥锁。
(2)@synchronized性能差的原因就是底层内部不断的增删改查所致。
(3)由于@synchronized方便简单,所以经常使用。

三、pthread_mutex

在Posix Thread中定义有一套专门用于线程同步的mutex函数。
mutex,用于保证在任何时刻,都只能有一个线程访问该对象。当获取锁操作失败时,线程会进入睡眠,等待锁释放时被唤醒。

1、创建

有两种方法创建互斥锁,静态方式和动态方式。

  • 静态方式:POSIX定义了一个宏PTHREAD_MUTEX_INITIALIZER来静态初始化互斥锁。
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  • 动态方式:
pthread_mutex_t _lock;
pthread_mutex_init(&(_lock), NULL);

2、互斥锁的属性

互斥锁的属性会在创建的时候指定,在IOS系统中有以下几种属性可以选择:

/*
 * Mutex type attributes
 */
#define PTHREAD_MUTEX_NORMAL		0
#define PTHREAD_MUTEX_ERRORCHECK	1
#define PTHREAD_MUTEX_RECURSIVE		2
#define PTHREAD_MUTEX_DEFAULT		PTHREAD_MUTEX_NORMAL

(1)PTHREAD_MUTEX_NORMAL 也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。

(2)PTHREAD_MUTEX_ERRORCHECK 检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与普通锁类型动作相同。这样保证当不允许多次加锁时不出现最简单情况下的死锁。

(3)PTHREAD_MUTEX_RECURSIVE 嵌套锁(递归锁),允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。

另外:在linux系统中,还有一个属性是适配锁(PTHREAD_MUTEX_ADAPTIVE_NP),动作最简单的锁类型,仅等待解锁后重新竞争。

比如我们将锁的属性指定为PTHREAD_MUTEX_RECURSIVE

pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&_mutex, &attr);

那么它就是一把递归锁了。

3、锁的操作

  • 加锁:
    int pthread_mutex_lock(pthread_mutex_t *mutex)
  • 解锁
    int pthread_mutex_unlock(pthread_mutex_t *mutex)
  • 尝试加锁
    int pthread_mutex_trylock(pthread_mutex_t *mutex)
注意:pthread_mutex_lock会阻塞,pthread_mutex_trylock是非阻塞的。

lock,锁不到的话,就挂起等待,等到我能锁了,再进行一下步操作
trylock就是尝试锁一下,锁不到就拉倒,不会影响自己进行下一步操作。

4、销毁

使用完锁之后需要进行释放:
销毁一个互斥锁即意味着释放它所占用的资源,且要求锁当前处于开放状态。
pthread_mutex_destroy()除了检查锁状态以外(锁定状态则返回EBUSY)没有其他动作。

pthread_mutex_destroy(&(_lock));

四、NSLock

1、NSLock探索

NSLock是基于NSLocking协议的。

@protocol NSLocking

- (void)lock NS_SWIFT_UNAVAILABLE_FROM_ASYNC("Use async-safe scoped locking instead");

- (void)unlock NS_SWIFT_UNAVAILABLE_FROM_ASYNC("Use async-safe scoped locking instead");

@end

NSLock的头文件也非常的简单:

@interface NSLock : NSObject <NSLocking>

- (BOOL)tryLock NS_SWIFT_UNAVAILABLE_FROM_ASYNC("Use async-safe scoped locking instead");

- (BOOL)lockBeforeDate:(NSDate *)limit NS_SWIFT_UNAVAILABLE_FROM_ASYNC("Use async-safe scoped locking instead");

@property (nullable, copy) NSString *name API_AVAILABLE(macos(10.5), ios(2.0), watchos(2.0), tvos(9.0));

@end

我们使用起来也是非常的简单:

 NSLock *lock = [[NSLock alloc] init];
    [lock lock];
    [lock unlock];

那么lock和unlock在底层做了什么事情呢?
先打开汇编走断点,发现走到objc_msgSend,就没有下文了。
继续下一个符号断点。

由于NSLock是属于Foundation框架的。直接分析Swift版本的Foundation源码。

//初始化 (必须进行初始化)
   public override init() {
#if os(Windows)
        InitializeSRWLock(mutex)
        InitializeConditionVariable(timeoutCond)
        InitializeSRWLock(timeoutMutex)
#else
        pthread_mutex_init(mutex, nil)
#if os(macOS) || os(iOS)
        pthread_cond_init(timeoutCond, nil)
        pthread_mutex_init(timeoutMutex, nil)
#endif
#endif
    }

//加锁
     open func lock() {
#if os(Windows)
        AcquireSRWLockExclusive(mutex)
#else
        pthread_mutex_lock(mutex)
#endif
    }

//解锁
    open func unlock() {
#if os(Windows)
        ReleaseSRWLockExclusive(mutex)
        AcquireSRWLockExclusive(timeoutMutex)
        WakeAllConditionVariable(timeoutCond)
        ReleaseSRWLockExclusive(timeoutMutex)
#else
        pthread_mutex_unlock(mutex)
#if os(macOS) || os(iOS)
        // Wakeup any threads waiting in lock(before:)
        pthread_mutex_lock(timeoutMutex)
        //广播出去锁的状态
        pthread_cond_broadcast(timeoutCond)
        pthread_mutex_unlock(timeoutMutex)
#endif
#endif
    }

通过swift源码就知道了,NSLock就是对pthread_mutex的一层封装,而且性能是仅次于pthread_mutex的。

2、NSLock坑点

- (void)testRecursive {
    for (int i = 0; i < 100; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            static void(^testMethod)(int);
            testMethod = ^(int value) {
                if (value > 0) {
                    NSLog(@"current value = %d",value);
                    testMethod(value-1);
                }
            };
            testMethod(10);
        });
    }
}

在以上方法中,出现了block的嵌套调用。
根据以上代码尝试加锁解锁:

(1)方案1:在block调用之前加锁,在所有嵌套调用完之后解锁:
- (void)testRecursive {
    for (int i = 0; i < 100; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            static void(^testMethod)(int);
            [self.lock lock];
            testMethod = ^(int value) {
                if (value > 0) {
                    NSLog(@"current value = %d",value);
                    testMethod(value-1);
                }
            };
            testMethod(10);
            [self.lock unlock];
        });
    }
}

以上加锁没有问题,但是从性能来说的话,会延长没有获取锁的线程的等待时间,阻塞形式很严重。所以换一个思路来加锁的:

方案2:在每次打印执行和打印之后加锁:
- (void)testRecursive {
    for (int i = 0; i < 100; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            static void(^testMethod)(int);
            testMethod = ^(int value) {
                 [self.lock lock];
                if (value > 0) {
                    NSLog(@"current value = %d",value);
                    testMethod(value-1);
                }
                [self.lock unlock];
            };
            testMethod(10);
        });
    }
}

此时打印结果只有:10。
因为NSLock不支持重复使用,在某一个瞬间会加锁多次,但是没有解锁,会造成相互等待。
继续尝试更换加锁位置:

方案3:将加锁放在block调用之前:
- (void)testRecursive {
    for (int i = 0; i < 100; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            static void(^testMethod)(int);
            [self.lock lock];
            testMethod = ^(int value) {
                if (value > 0) {
                    NSLog(@"current value = %d",value);
                    testMethod(value-1);
                }
                [self.lock unlock];
            };
            testMethod(10);
        });
    }
}

此时就会崩溃了,因为NSLock不支持递归调用。
那么此时换成递归锁呢:

- (void)testRecursive {
    NSRecursiveLock *recursiveLock = [[NSRecursiveLock alloc] init];
    for (int i = 0; i < 100; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            static void(^testMethod)(int);
             [recursiveLock lock];
            testMethod = ^(int value) {
                if (value > 0) {
                    NSLog(@"current value = %d",value);
                    testMethod(value-1);
                }
                [recursiveLock unlock];
            };
            testMethod(10);
        });
    }
}

运行正常。

NSRecursiveLock和NSLock在底层的区别:

 public override init() {
        super.init()
#if os(Windows)
        InitializeCriticalSection(mutex)
        InitializeConditionVariable(timeoutCond)
        InitializeSRWLock(timeoutMutex)
#else
#if CYGWIN || os(OpenBSD)
        var attrib : pthread_mutexattr_t? = nil
#else
        var attrib = pthread_mutexattr_t()
#endif
        withUnsafeMutablePointer(to: &attrib) { attrs in
            pthread_mutexattr_init(attrs)
#if os(OpenBSD)
            let type = Int32(PTHREAD_MUTEX_RECURSIVE.rawValue)
#else
            let type = Int32(PTHREAD_MUTEX_RECURSIVE)
#endif
            pthread_mutexattr_settype(attrs, type)
            pthread_mutex_init(mutex, attrs)
        }
#if os(macOS) || os(iOS)
        pthread_cond_init(timeoutCond, nil)
        pthread_mutex_init(timeoutMutex, nil)
#endif
#endif
    }

通过源码分析,它们的区别就是在初始化的时候,NSRecursiveLock使用的属性是:PTHREAD_MUTEX_RECURSIVE,而NSLock使用的默认属性,也就是普通互斥锁。

3.需要注意:

(1)针对方案2,如果改成递归锁,会出现崩溃,这是因为我们的程序是多线程执行的,而NSRecursiveLock并不是一把多线程可递归锁(虽然底层对pthread_mutex进行了封装,但是它并不支持多线程可递归)。
(2)此时需要使用具有多线程可递归性的@synchronized来解决,因为上面分析可知,@synchronized底层SyncData中就有一把递归锁,并且记录了lockCount(可重复加锁次数)和threadCount(线程数)的特性。
(3)https://www.jianshu.com/p/11399045e46f

五、NSCondition

NSCondition的对象实际上作为一个锁和一个线程检查器:

  • 锁主要为了当检测条件时保护数据源,执行条件引发的任务。
  • 线程检查器主要是根据条件决定是否继续运行线程,即线程是否被阻塞。
1: [condition lock]; 一般用于多线程同时访问、修改同一个数据源,保证在同一时间内数据源只被访问、修改一次,其他线程的命令需要在lock外等待,直到unlock,才可访问。
2: [condition lock]; 与lock同时使用
3: [condition wait]; 让当前线程处于等待状态
4: [condition signal]; CPU发信号告诉一个等待中的线程不用在等待,可以继续执行
5: [condition broadcast];CPU发信号告诉所有等待中的线程不用在等待,可以继续执行

例子1: signal和broadcast的区别

- (void)test {
    _condition = [[NSCondition alloc] init];
    [NSThread detachNewThreadWithBlock:^{
        [self.condition lock];
        NSLog(@"执行任务1");
        [self.condition wait];
        sleep(1);
        NSLog(@"执行任务1完毕");
        [self.condition unlock];
    }];
    
    [NSThread detachNewThreadWithBlock:^{
        [self.condition lock];
        NSLog(@"执行任务2");
        [self.condition wait];
        sleep(1);
        NSLog(@"执行任务2完毕");
        [self.condition unlock];
    }];
    
    [NSThread detachNewThreadWithBlock:^{
        [self.condition lock];
        NSLog(@"执行任务3");
        sleep(1);
        NSLog(@"执行任务3完毕");
        //[self.condition signal];
        [self.condition broadcast];
        [self.condition unlock];
    }];
}

以上有3条线程,任务1和任务2都调用了wait。当任务3执行完毕的时候,如果调用signal,只会让其中一条线程继续执行,如果调用broadcast,则会让所有等待中的线程都执行。

例子2: 针对之前卖票的demo做一个升级:不但可以卖票,还能在票卖完之后进行补票。这种场景就是典型的生产消费者模型。

demo如下:

@property (nonatomic, strong) NSCondition *condition;
@property (nonatomic, assign) int ticketCount;
//......

- (void)testCondition {
    _condition = [[NSCondition alloc] init];
    for (int i = 0; i < 50; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            [self testProducer];
        });
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            [self testConsumer];
        });
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            [self testConsumer];
        });
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            [self testProducer];
        });
    }
}

- (void)testProducer {
    self.ticketCount++;
    NSLog(@"生产一个,还有:%i",self.ticketCount);
}

- (void)testConsumer {
    if (self.ticketCount == 0) {
        NSLog(@"消费完了,等待生产");
    }
    self.ticketCount--;
    NSLog(@"消费一个,还剩:%i",self.ticketCount);
}

以上demo,一定会造成混乱,打印结果也是如此。
通过NSCondition来解决生产者消费者问题:

- (void)testProducer {
     [_condition lock]; //控制操作的多线程影响
    self.ticketCount++;
    NSLog(@"生产一个,还有:%i",self.ticketCount);
      [_condition signal];//通知等待中的线程(只对一个线程起作用)
      [_condition unlock];
}

- (void)testConsumer {
    [_condition lock]; //控制操作的多线程影响
    while (self.ticketCount == 0) {
        NSLog(@"消费完了,等待生产");
         [_condition wait]; //让当前线程处于等待状态
    }
    self.ticketCount--;
    NSLog(@"消费一个,还剩:%i",self.ticketCount);
     [_condition unlock];
}

以上demo需要注意一个点:

//...
  while (self.ticketCount == 0) {
        NSLog(@"消费完了,等待生产");
         [_condition wait]; //让当前线程处于等待状态
    }
//...

这里把if换成了while,因为当执行完 [_condition wait]之后,为了不防止其他线程工作,锁会自动解除。由于多线程,多个等待线程可能会同时被唤醒,那么就会通过if的判断进入下面--的操作,造成ticketCount为负数。所以这里用while保证安全。

六、NSConditionLock

1. [conditionlock lock] & [conditionlock unLock]  加锁解锁,不用考虑条件
2. [conditionlock lockWhenCondition:A条件] 表示如果没有其他线程获得锁,但是内部condition不等于A条件时,它依然不能获得锁,仍然等待;如果内部的condition等于A条件,并且没有其他线程获得锁,则进入代码区,同时设置它获得该锁,其他任何线程都将等待它代码的完成,直至它解锁。
3.[conditionlock unlockWithCondition:A条件] 表示释放锁,同时把内部的condition设置为A条件。
4. [conditionlock lockWhenCondition:A条件 beforeDate:A时间]表示如果被锁定(没获得锁),并超过该时间则不再阻塞线程。注意:这个函数没有改变锁的状态,只是超时之后不再拥有条件。

demo如下:

- (void)test {
    NSConditionLock *conditionLock = [[NSConditionLock alloc] initWithCondition:3];
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        [conditionLock lockWhenCondition:1]; // condition = 1
        NSLog(@"线程1");
        [conditionLock unlockWithCondition:0];
    });
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        [conditionLock lockWhenCondition:2];
        NSLog(@"线程2");
        [conditionLock unlockWithCondition:1];
    });
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        [conditionLock lockWhenCondition:3];
        NSLog(@"线程3");
        [conditionLock unlockWithCondition:2];
    });
}

打印顺序为:3 2 1

七、NSConditionLock底层分析

1、简单汇编分析:


打下断点,进入汇编模式。
然后会来到objc_msgSend函数,因为第一个参数是消息接收者,第二参数是sel,所以我们直接通过lldb来验证:

继续往下走,发现objc_msgSend之后就会去走整个方法的查找流程了,所以此时,我们应该通过符号断点来分析。

进入汇编:

此时再查看寄存器

(lldb) register read x0
      x0 = 0x00000001da6a4b40  (void *)0x00000001da6a4b68: NSDate

此时方法调用者变成了NSDate,继续单步执行来到b(跳转)的地方,继续查看x1寄存器

(lldb) register read x1
      x1 = 0x00000001cb665a34  
(lldb) po (SEL)0x00000001cb665a34
"lockWhenCondition:beforeDate:"

我们先不用去深入分析汇编的细节(后面具体来学习汇编指令)。
继续针对"lockWhenCondition:beforeDate:"下符号断点,因为arm汇编对指令优化的很多,很多细节看不出来,所以切换到模拟器,进入汇编:

也就看到了加锁和解锁,并且有设置超时时间的方法。

2、源码分析

关于更具体的底层原理,继续通过swift源码来分析。

open func lock(before limit: Date) -> Bool {
        _cond.lock()
        while _thread != nil {
            if !_cond.wait(until: limit) {
                _cond.unlock()
                return false
            }
        }
#if os(Windows)
        _thread = GetCurrentThread()
#else
        _thread = pthread_self()
#endif
        _cond.unlock()
        return true
    }
    
    open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
        _cond.lock()
        while _thread != nil || _value != condition {
            if !_cond.wait(until: limit) {
                _cond.unlock()
                return false
            }
        }
#if os(Windows)
        _thread = GetCurrentThread()
#else
        _thread = pthread_self()
#endif
        _cond.unlock()
        return true
    }
    
    open var name: String?
}

在每次加锁的时候,会判断condition的条件,如果condtion不等于_value,那么就等待。
在初始化的时间就维护了一个_value的变量。

 public init(condition: Int) {
        _value = condition
    }

同样的在解锁的时候,也会改变value的值:

open func unlock(withCondition condition: Int) {
        _cond.lock()
#if os(Windows)
        _thread = INVALID_HANDLE_VALUE
#else
        _thread = nil
#endif
        _value = condition
        _cond.broadcast()
        _cond.unlock()
    }

注意其中的broadcast是指,唤醒在此NSConditionLock对象上等待的所有线程。

八、读写锁

1、读写锁的特性

(1)⼀个读写锁同时只能有⼀个写者或多个读者(与CPU数相关)
(2)不能同时既有读者⼜有写者

总结:⼀次只有⼀个线程可以占有写模式的读写锁, 但是可以有多个线程同时占有读模式的读写锁
读写锁适合于对数据结构的读次数⽐写次数多得多的情况. 因为, 读模式锁定时可以共享, 以写模式锁住时意味着独占, 所以读写锁⼜叫共享-独占锁。

2、通过GCD实现读写锁(同步任务+栅栏)

demo如下:

@interface Test7ViewController ()
{
    dispatch_queue_t _readQue;
}
@property (nonatomic, assign) int name;
@end

- (void)viewDidLoad {
    [super viewDidLoad];
    self.view.backgroundColor = [UIColor whiteColor];
    // Do any additional setup after loading the view.
    _readQue = dispatch_queue_create("read-test", DISPATCH_QUEUE_CONCURRENT);

    for (int i = 0; i < 20; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            self.name ++;
        });
    }

    for (int i = 0; i < 20; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            NSLog(@"%i",[self getName]);
        });
    }
}

- (void)setName:(int)name {
    
    dispatch_barrier_async(_readQue, ^{
        self->_name = name;
        self.setCount++;
        NSLog(@"设置%i",self.setCount);
    });
}

- (int)getName {
    __block int data;
    dispatch_sync(_readQue, ^{
        data = self->_name;
    });
    return data;
}

以上是GCD实现读写锁的案例:
(1)创建一个并发队列
(2)写的任务用异步栅栏函数执行
(3)读的任务用同步函数执行
另外,这里我没有重写get方法,为了避免在++的操作时,同时调用set和get方法。
如果此时用要重写get方法,需要加入@synthesize name = _name; 因为同时重写get、set方法时,编译器不会生成get、set方法。

3、使用pthread_rwlock

- (void)test1 {
    pthread_rwlock_init(&_rwlock, NULL);
    for (int i = 0; i < 200; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            pthread_rwlock_wrlock(&self->_rwlock);
            self.count ++;
            pthread_rwlock_unlock(&self->_rwlock);
        });
    }

    for (int i = 0; i < 200; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            pthread_rwlock_rdlock(&self->_rwlock);
            NSLog(@"%i",self.count);
            pthread_rwlock_unlock(&self->_rwlock);
        });
    }
}
- (void)dealloc {
    pthread_rwlock_destroy(&_rwlock);
}

经过测试:当同时用大量线程去进行读写操作时,GCD的同步任务+栅栏会造成线程死锁。pthread_rwlock方案一切正常。所以如果是追求性能的情况下,使用pthread_rwlock更优。