前言

一、GCD初始化函数

二、获取当前GCD队列名称

三、关于_dispatch_queue_get_current()的思考

四、GCD单利原理

五、队列的创建流程

六、异步函数分析

七、异步函数+并发队列


前面对GCD基础内容和NSOperation&NSOperationQueue都做了总结,接下来继续研究GCD的底层原理。在继续分析源码之前,由于GCD的源码确实很复杂。如果每句代码都分析,对我来说非常不现实,所以这里就只分析业务上面常用的功能函数。


一、GCD初始化函数

首先是GCD的初始化libdispatch_init。这个函数在之前研究dyld源码时候就遇到过,它是由libSystem_initializer函数里面进行调用的。
libdispatch_init核心代码如下:

DISPATCH_EXPORT DISPATCH_NOTHROW
void libdispatch_init(void) {
  //.....省略
#if HAVE_PTHREAD_WORKQUEUE_QOS
    //设置主线程的优先级 qos_class_main() = QOS_CLASS_USER_INTERACTIVE
    dispatch_qos_t qos = _dispatch_qos_from_qos_class(qos_class_main());
    _dispatch_main_q.dq_priority = _dispatch_priority_make(qos, 0);
#endif
  //.....省略
#if DISPATCH_USE_THREAD_LOCAL_STORAGE
    //线程私有数据存储
    _dispatch_thread_key_create(&__dispatch_tsd_key, _libdispatch_tsd_cleanup);
#endif
  //.....省略
    //设置当前主队列
    _dispatch_queue_set_current(&_dispatch_main_q);
    _dispatch_queue_set_bound_thread(&_dispatch_main_q);
    //一系列的初始化
    _dispatch_hw_config_init();
    _dispatch_time_init();
    _dispatch_vtable_init();
    //此函数会去初始化objc库
    _os_object_init();
    _voucher_init();
    _dispatch_introspection_init();
}

初始化函数中对主队列进行了设置 ,搜索_dispatch_main_q如下:

struct dispatch_queue_static_s _dispatch_main_q = {
	DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
#if !DISPATCH_USE_RESOLVERS
	.do_targetq = _dispatch_get_default_queue(true),
#endif
	.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
			DISPATCH_QUEUE_ROLE_BASE_ANON,
	.dq_label = "com.apple.main-thread",
	.dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
	.dq_serialnum = 1,
};

可以看到_dispatch_main_q是一个结构体。我们 po 一下dispatch_get_main_queue():

<OS_dispatch_queue_main: com.apple.main-thread[0x10bf63d40] 
= { xref = -2147483648, ref = -2147483648, sref = 1, 
target = com.apple.root.default-qos.overcommit[0x10bf64340], 
width = 0x1, state = 0x001ffe9000000100, dirty, in-flight = 0, 
thread = 0x103 }>

打印内容和结构体是一一对照的。
另外在gcd的源码中,定义很多没有见过的结构体或者联合体,就像_dispatch_main_q是dispatch_queue_static_s。
再比如dispatch_object_t ,它是一个联合体,起到了抽象类的作用。

typedef union {
	struct _os_object_s *_os_obj;
	struct dispatch_object_s *_do;
	struct dispatch_queue_s *_dq;
	struct dispatch_queue_attr_s *_dqa;
	struct dispatch_group_s *_dg;
	struct dispatch_source_s *_ds;
	struct dispatch_channel_s *_dch;
	struct dispatch_mach_s *_dm;
	struct dispatch_mach_msg_s *_dmsg;
	struct dispatch_semaphore_s *_dsema;
	struct dispatch_data_s *_ddata;
	struct dispatch_io_s *_dchannel;

	struct dispatch_continuation_s *_dc;
	struct dispatch_sync_context_s *_dsc;
	struct dispatch_operation_s *_doperation;
	struct dispatch_disk_s *_ddisk;
	struct dispatch_workloop_s *_dwl;
	struct dispatch_lane_s *_dl;
	struct dispatch_queue_static_s *_dsq;
	struct dispatch_queue_global_s *_dgq;
	struct dispatch_queue_pthread_root_s *_dpq;
	dispatch_queue_class_t _dqu;
	dispatch_lane_class_t _dlu;
	uintptr_t _do_value;
} dispatch_object_t

二、获取当前GCD队列名称

我们可以通过dispatch_queue_get_label获取当前任务所在的队列名称。

  NSString *queName = [NSString stringWithCString:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL) 
  encoding:NSUTF8StringEncoding];
    NSLog(@"%@",queName);

打印结果为:com.apple.main-thread。

那么dispatch_queue_get_label是如何获取到当前队列名称的呢?继续看源码:

#define DISPATCH_CURRENT_QUEUE_LABEL NULL
 //获取队列名称函数
 const char *
 dispatch_queue_get_label(dispatch_queue_t dq)
 {
     if (unlikely(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
         dq = _dispatch_queue_get_current_or_default();
     }
     return dq->dq_label ? dq->dq_label : "";
 }

当我们传的参数不为DISPATCH_CURRENT_QUEUE_LABEL(也就是NULL)时,会直接返回dq->dq_label,这个不难理解。
当我们传DISPATCH_CURRENT_QUEUE_LABEL时,会走_dispatch_queue_get_current_or_default函数:


enum {
	DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS = 0,
	DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT,
	DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS,
	DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT,
	DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS,
	DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT,
	DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS,
	DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT, //7
	DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS,
	DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT,
	DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS,
	DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT,
	_DISPATCH_ROOT_QUEUE_IDX_COUNT,
};


DISPATCH_ALWAYS_INLINE
static inline dispatch_queue_t
_dispatch_queue_get_current_or_default(void)
{
	int idx = DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT;
	return _dispatch_queue_get_current() ?: _dispatch_root_queues[idx]._as_dq;
}

此时会走2个分支:_dispatch_queue_get_current() 和 _dispatch_root_queues[idx]._as_dq。

(1)_dispatch_root_queues 全局队列集合


struct dispatch_queue_global_s _dispatch_root_queues[] = {
    //..... .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
    
	_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
		.dq_label = "com.apple.root.maintenance-qos",
		.dq_serialnum = 4,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
		.dq_label = "com.apple.root.maintenance-qos.overcommit",
		.dq_serialnum = 5,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
		.dq_label = "com.apple.root.background-qos",
		.dq_serialnum = 6,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
		.dq_label = "com.apple.root.background-qos.overcommit",
		.dq_serialnum = 7,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
		.dq_label = "com.apple.root.utility-qos",
		.dq_serialnum = 8,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
		.dq_label = "com.apple.root.utility-qos.overcommit",
		.dq_serialnum = 9,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
		.dq_label = "com.apple.root.default-qos",
		.dq_serialnum = 10,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
			DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
		.dq_label = "com.apple.root.default-qos.overcommit",
		.dq_serialnum = 11,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
		.dq_label = "com.apple.root.user-initiated-qos",
		.dq_serialnum = 12,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
		.dq_label = "com.apple.root.user-initiated-qos.overcommit",
		.dq_serialnum = 13,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
		.dq_label = "com.apple.root.user-interactive-qos",
		.dq_serialnum = 14,
	),
	_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
		.dq_label = "com.apple.root.user-interactive-qos.overcommit",
		.dq_serialnum = 15,
	),
};

_dispatch_root_queues是一个数组,可以取出不同优先级的全局队列。
另外它和我们自己创建的并发队列有一个区别就是dq_atomic_flags的值:

#define DISPATCH_QUEUE_WIDTH_FULL			0x1000ull //4096
#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1) //4095 (全局队列)
#define DISPATCH_QUEUE_WIDTH_MAX  (DISPATCH_QUEUE_WIDTH_FULL - 2) //4094 (普通并发队列)

(2)什么情况下会走_dispatch_root_queues[idx]._as_dq流程?

在NSThread开启的子线程中去获取当前队列:

- (void)testThreadGetQue {
    [NSThread detachNewThreadWithBlock:^{
        NSString *queName = [NSString stringWithCString:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL) encoding:NSUTF8StringEncoding];
        NSLog(@"%@",queName);
    }];
}

打印结果为: com.apple.root.default-qos.overcommit

因为DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT = 7,所以_dispatch_root_queues数组中索引为7的全局队列是:

_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
			DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
		.dq_label = "com.apple.root.default-qos.overcommit",
		.dq_serialnum = 11,
	),

它的dq_label也就是我们打印出来的“com.apple.root.default-qos.overcommit”。
分析到这里也并不能证明一定是走的_dispatch_root_queues[idx]._as_dq流程,因为_dispatch_queue_get_current() 也有可能会获取到名称为“com.apple.root.default-qos.overcommit”的队列。

DISPATCH_ALWAYS_INLINE
static inline dispatch_queue_t
_dispatch_queue_get_current(void)
{
	return (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key);
}

DISPATCH_TSD_INLINE
static inline void *
_dispatch_thread_getspecific(pthread_key_t k)
{
#if DISPATCH_USE_DIRECT_TSD
	if (_pthread_has_direct_tsd()) {
		return _pthread_getspecific_direct(k);
	}
#endif
	return pthread_getspecific(k);
}

由以上代码可知,当前队列的信息是保存在线程的私有数据中的,而对应的key为dispatch_queue_key。

static const unsigned long dispatch_queue_key		= __PTK_LIBDISPATCH_KEY0;

__PTK_LIBDISPATCH_KEY0的值,我经过查阅资料和符号断点调试,发现是在libpthread源码中的。

#define __PTK_LIBDISPATCH_KEY0		20

接下来,下两个符号断点:dispatch_queue_get_label 和 pthread_getspecific
从而验证一下:“在通过NSThread创建的子线程中获取当前队列的名称,是否一定走的是_dispatch_root_queues[idx]._as_dq。”
断点来到dispatch_queue_get_label之后,继续跳转到pthread_getspecific。

此时读取寄存器:

可以看到rdi对应到参数 0x000000014,就是20。继续跟断点:


此时发现,rax中的返回值就是0x000000000,证明_dispatch_queue_get_current()为NULL,默认返回对应的全局队列了。


三、关于_dispatch_queue_get_current()的思考

通过上面的分析可以知道,队列的信息是通过pthread_getspecific来获取的。

线程私有数据(tsd 或者 tls)可以保证线程安全,比通过加锁保证线程安全的方式更加高效。
(和java中的ThreadLocal一样的)。
它可以在不同的线程中,用同一个key取获取不同的值。

但是当我在“串行队列同步函数”中取获取“当前队列名称”的时候,问题来了:

此时在主线程中,并且获取私有数据时的key也是dispatch_queue_key。那么为什么能获取到当前串行队列的名称,而不是主队列的名称呢?难道用pthread_getspecific函数,在同一个线程中,用同一个key,还能获取不同的value?
通过查看libpthread的源码:

找到了一点线索,肯定是pthread的底层根据当前所在的上下文环境对其进行了判断处理,这里就不深入分析了,后面有时间再看。

总结:

1.pthread_setspecific 和 pthread_getspecific 可以设置将一个key,在不同的线程里设置不同的value。

2.当pthread_key_t是系统指定的key时,在相同线程下,通过pthread_getspecific用同一个key获取的value值不一定相同。(比如__PTK_LIBDISPATCH_KEY0,在相同线程下,不同队列中,可以通过pthread_getspecific(20)获取当前队列的对象。)


四、GCD单利原理

研究GCD队列和GCD同步/异步函数之前,先来研究一下单利的实现原理。

1、当我们不使用dispatch_once来实现单利的时候,我们通常会这么写:

@interface SingleNoGCDObject : NSObject

@property (nonatomic, copy) NSString *name;

+ (SingleNoGCDObject *)getInstance;

@end

@implementation SingleNoGCDObject

static SingleNoGCDObject *instance;
//双重检查
+ (SingleNoGCDObject *)getInstance {
    if (instance == nil) {
        @synchronized (self) {
            if (instance == nil) {
                instance = [[SingleNoGCDObject alloc] init];
            }
        }
    }
    return instance;
}
@end

其中第一次做instance == nil的判断是因为: 效率问题。

假设第一次判空不加,那么每次进入这个方法,instance不论是不是nil,都会执行下面的synchronized代码块,多线程下会出现锁的竞争,从而浪费性能。

第二次做instance == nil的判断是因为:防止多次初始化

多线程下,可能会出现两个线程都经过了前面第一次检查,来到了下面的synchronized这里,如果不判空,就会出现一个线程alloc了一个SingleNoGCDObject出来,然后释放锁,第二个线程进来又会alloc一个SingleNoGCDObject出来。

这两次判空检查在java中叫DCL(双重检查机制)

在java中还需要对instance静态变量加上volatile,防止编译器指令重排,而ios此处不会进行指令重排,所以不用加以volatile修饰。

volatile在java和c中的区别:

volatile 在 C 环境下仅仅是编译层面的内存屏障,仅能保证编译器不优化和重排被 volatile 修饰的内容,而在 Java 环境下 volatile 还具有 CPU 层面的内存屏障作用。

2、当我们使用GCD时,只需要这么写:


@interface SingleGCDObject : NSObject

@property (nonatomic, copy) NSString *name;

+ (SingleGCDObject *)getInstance;

@end

@implementation SingleGCDObject

+ (SingleGCDObject *)getInstance {
    static SingleGCDObject *instance;
    static dispatch_once_t onceToken;
    NSLog(@"1:%ld", onceToken);
    dispatch_once(&onceToken, ^{
        instance = [[SingleGCDObject alloc] init];
        NSLog(@"2:%ld", onceToken);
    });
    NSLog(@"3:%ld", onceToken);
    return instance;
}
//打印结果为:   1:0     2:256   3:-1
@end

当使用dispatch_once时,既没有对instance进行判空,也没有看到加锁的操作,就是这么简单的实现了单利。接下来,带着3个问题,来研究源码:(1)为什么只执行一次 (2)如何保证线程安全 (3)如何销毁单利。

#ifdef __BLOCKS__
//单利
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
	dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
#endif

核心函数:

/*
 val key
 ctxt block
 func 回调函数
 */
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
	
	//强转为dispatch_once_gate_t 结构体
	/*
	 typedef struct dispatch_once_gate_s {
		 //联合体
		 union {
			 dispatch_gate_s dgo_gate;
			 uintptr_t dgo_once;
		 };
	 } dispatch_once_gate_s, *dispatch_once_gate_t;
	 */
	dispatch_once_gate_t l = (dispatch_once_gate_t)val;
	

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
	//如果dgo_once == DLOCK_ONCE_DONE,则拦截返回
	if (likely(v == DLOCK_ONCE_DONE)) {
		//判断拦截
		return;
	}
	
	
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
		return _dispatch_once_mark_done_if_quiesced(l, v);
	}
#endif
	
	
#endif
	if (_dispatch_once_gate_tryenter(l)) {
		//在这里回调
		return _dispatch_once_callout(l, ctxt, func);
	}
	return _dispatch_once_wait(l);
}

1、流程总结:

(1)底层会调用dispatch_once_t内部函数,传入的参数分别为:dispatch_once_t predicate、block 以及 block的函数指针。

(2)当前传入的变量是一个全局静态变量,每一个单利对应不同且唯一的一个变量。然后会将此变量val(predicate)封装成 dispatch_once_gate_t结构体。并使用类似KVC的形式,通过os_atomic_load出来对应变量的值。如果取出来的值为DLOCK_ONCE_DONE,表示已经处理过一次了,就直接return了。

(3)为了保证线程安全,通过_dispatch_once_gate_tryenter把自己锁起来,保证当前任务执行的唯一,防止相同的onceToken进行多次执行。

static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
	//比较 DLOCK_ONCE_UNLOCKED 和原子变量 &l->dgo_once 中的值,如果相等,那么就把 (uintptr_t)_dispatch_lock_value_for_self() 赋给原子变量。返回旧的原子变量 &l->dgo_once 中的值
	return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
			(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}

(4)锁住之后进行block的调用。


static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
    dispatch_function_t func)
{
  _dispatch_client_callout(ctxt, func);
    // 处理完成之后 进行广播 
  _dispatch_once_gate_broadcast(l);
}

(5)block调用完之后,进行广播,将 v 的值 置为 DLOCK_ONCE_DONE,下次就不会在进入调用block流程。从而保证了唯一性。

static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
  dispatch_lock value_self = _dispatch_lock_value_for_self();
  uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
  v = _dispatch_once_mark_quiescing(l);
#else
  v = _dispatch_once_mark_done(l);
#endif
  if (likely((dispatch_lock)v == value_self)) return;
  _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}

...

static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
    // 先匹配 再改变为 DLOCK_ONCE_DONE 的状态
    // 下次 判断为 DLOCK_ONCE_DONE 状态,就无法进来
    return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}

至此,为什么只执行一次和线程安全的问题就知道了。

2、销毁单利

当我们不使用dispatch_once设置单利时,只需要把instance静态变量设置为nil就可以了。
当我们使用dispatch_once时,还需要将predicate(onceToken)的值设置为0(相当于清除了底层DLOCK_ONCE_DONE的设置),代码如下:

@implementation SingleGCDObject

static dispatch_once_t onceToken;
static SingleGCDObject *aInstance;

+ (SingleGCDObject *)getInstance {
   
   NSLog(@"1:%ld", onceToken);
   dispatch_once(&onceToken, ^{
       aInstance = [[SingleGCDObject alloc] init];
       NSLog(@"2:%ld", onceToken);
   });
   NSLog(@"3:%ld", onceToken);
   return aInstance;
}

- (void)releaseInstance {
   onceToken = 0;
   aInstance = nil;
}
@end

五、队列的创建流程

接下来继续研究dispatch_queue_create源码,分析队列是如何创建的。
精简一下源码:

/*
 创建队列核心方法
 label 队列名称
 dqa 队列的属性
 tq 目标队列
 legacy
 */
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{
    //设置队列的属性信息
    dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
    //...省略
    // Step 1: Normalize arguments (qos, overcommit, tq)
    // 1.规范化参数
    //处理服务质量
    dispatch_qos_t qos = dqai.dqai_qos;
#if !HAVE_PTHREAD_WORKQUEUE_QOS
    if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
        dqai.dqai_qos = qos = DISPATCH_QOS_USER_INITIATED;
    }
    if (qos == DISPATCH_QOS_MAINTENANCE) {
        dqai.dqai_qos = qos = DISPATCH_QOS_BACKGROUND;
    }
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS
    //...省略
    if (!tq) {
        //获取一个管理自己队列的root队列。
        tq = _dispatch_get_root_queue(
                qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
                overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq;
        if (unlikely(!tq)) {
            DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
        }
    }
    // Step 2: Initialize the queue
    // 2.初始化队列
//    legacy默认是true的
    if (legacy) {
        // if any of these attributes is specified, use non legacy classes
//        如果指定了这些属性中的任何一个,请使用非遗留类
        if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) {
            legacy = false;
        }
    }

    const void *vtable;
    dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
    //判断并发/串行
    if (dqai.dqai_concurrent) {
//        OS_dispatch_##name##_class =>  OS_dispatch_queue_concurrent_class
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
//        OS_dispatch_##name##_class =>  OS_dispatch_queue_serial_class
        vtable = DISPATCH_VTABLE(queue_serial);
    }
    //....
    //alloc/init
    dispatch_lane_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_lane_s));
    _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

    //设置队列名称
    dq->dq_label = label;
    //设置队列优先级
    dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
            dqai.dqai_relpri);
    
    if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
        dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    }
    
    if (!dqai.dqai_inactive) {
        _dispatch_queue_priority_inherit_from_target(dq, tq);
        _dispatch_lane_inherit_wlh_from_target(dq, tq);
    }
    //增加引用计数
    _dispatch_retain(tq);
    //自定义的queue的目标队列是root队列
    dq->do_targetq = tq;
    // _dispatch_introspection_queue_create
    return _dispatch_trace_queue_create(dq)._dq;
}

通过源码总结创建流程:

1、设置队列的属性信息

//设置队列的属性信息
dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
{
	dispatch_queue_attr_info_t dqai = { };
    //如果dqa为null,直接返回,也就证明了如果上层传入的是NULL,就默认是串行。
	if (!dqa) return dqai;
#if DISPATCH_VARIANT_STATIC
	if (dqa == &_dispatch_queue_attr_concurrent) {
        //并发
		dqai.dqai_concurrent = true;
		return dqai;
	}
#endif
    //... 默认定义dqa省略
    size_t idx = (size_t)(dqa - _dispatch_queue_attrs);
    //....
	dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
	idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;
    //....省略
	dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT;
	idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT;
    //....省略
	return dqai;
}

通过对传入的dqa来判断返回队列的属性信息,包括是否是并发队列,队列的优先级等。

//位域
typedef struct dispatch_queue_attr_info_s {
	dispatch_qos_t dqai_qos : 8;
	int      dqai_relpri : 8;
	uint16_t dqai_overcommit:2;
	uint16_t dqai_autorelease_frequency:2;
	uint16_t dqai_concurrent:1;
	uint16_t dqai_inactive:1;
} dispatch_queue_attr_info_t;

dispatch_queue_attr_info_t 是一个结构体位域。
另外需要注意的是,tq是根据qos和overcommit创建的一个全局队列,用来作为当前队列的管理队列。

2、规范化参数

主要根据上一步返回的属性信息dqai,来初始化将要创建队列的参数:服务质量、管理队列等。

3、队列的创建

(1)首先通过dqai.dqai_concurrent来判断是否串行队列还是并发队列。

#define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name)
#define DISPATCH_OBJC_CLASS(name)	(&DISPATCH_CLASS_SYMBOL(name))
#define DISPATCH_CLASS_SYMBOL(name) OS_dispatch_##name##_class

通过以上的宏定义,可以得到:
OS_dispatch_##name##class => OS_dispatch_queue_serial_class
OS_dispatch
##name##_class => OS_dispatch_queue_concurrent_class
得到类名称之后,通过_dispatch_object_alloc和_dispatch_queue_init来进行队列的创建和初始化。

void *
_dispatch_object_alloc(const void *vtable, size_t size)
{
#if OS_OBJECT_HAVE_OBJC1
   //.....省略
#else
   return _os_object_alloc_realized(vtable, size);
#endif
}

inline _os_object_t
_os_object_alloc_realized(const void *cls, size_t size)
{
   _os_object_t obj;
   dispatch_assert(size >= sizeof(struct _os_object_s));
   while (unlikely(!(obj = calloc(1u, size)))) {
   	_dispatch_temporary_resource_shortage();
   }
   obj->os_obj_isa = cls;
   return obj;
}

可以看到执行了calloc 和 isa的赋值。

4、包装返回队列

#define _dispatch_trace_queue_create _dispatch_introspection_queue_create

dispatch_queue_class_t
_dispatch_introspection_queue_create(dispatch_queue_t dq)
{
   dispatch_queue_introspection_context_t dqic;
   size_t sz = sizeof(struct dispatch_queue_introspection_context_s);
   //...省略
   dqic = _dispatch_calloc(1, sz);
   dqic->dqic_queue._dq = dq;
   //...省略
   dq->do_introspection_ctxt = dqic;
   //....省略
   if (DISPATCH_INTROSPECTION_HOOK_ENABLED(queue_create)) {
   	_dispatch_introspection_queue_create_hook(dq);
   }
   return upcast(dq)._dqu;
}

通过_dispatch_trace_queue_create->_dispatch_introspection_queue_create函数,包装并返回队列。
继续看_dispatch_introspection_queue_create_hook

DISPATCH_NOINLINE
static void
_dispatch_introspection_queue_create_hook(dispatch_queue_t dq)
{
	dispatch_introspection_queue_s diq;
	diq = dispatch_introspection_queue_get_info(dq);
	dispatch_introspection_hook_callout_queue_create(&diq);
}

DISPATCH_USED inline
dispatch_introspection_queue_s
dispatch_introspection_queue_get_info(dispatch_queue_t dq)
{
	if (dx_metatype(dq) == _DISPATCH_WORKLOOP_TYPE) {
		return _dispatch_introspection_workloop_get_info(upcast(dq)._dwl);
	}
	return _dispatch_introspection_lane_get_info(upcast(dq)._dl);
}

DISPATCH_ALWAYS_INLINE
static inline dispatch_introspection_queue_s
_dispatch_introspection_lane_get_info(dispatch_lane_class_t dqu)
{
    //....省略
	dispatch_introspection_queue_s diq = {
		.queue = dq->_as_dq,
		.target_queue = dq->do_targetq,
		.label = dq->dq_label,
		.serialnum = dq->dq_serialnum,
		.width = dq->dq_width,
		.suspend_count = _dq_state_suspend_cnt(dq_state) + dq->dq_side_suspend_cnt,
		.enqueued = _dq_state_is_enqueued(dq_state) && !global,
		.barrier = _dq_state_is_in_barrier(dq_state) && !global,
		.draining = (dq->dq_items_head == (void*)~0ul) ||
				(!dq->dq_items_head && dq->dq_items_tail),
		.global = global,
		.main = dx_type(dq) == DISPATCH_QUEUE_MAIN_TYPE,
	};
	return diq;
}

可以看到,队列是以dispatch_introspection_queue_s为模版创建的。


六、异步函数分析

了解了队列的创建流程之后,继续来研究异步函数的执行流程,先来看下dispatch_async的底层函数:

#ifdef __BLOCKS__
/*
dq 当前队列
work block任务
*/
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    //任务包装器
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
#endif

1、 对执行任务和参数的包装

首先会创建一个dispatch_continuation_t,它是dispatch_continuation_s的结构体指针

typedef struct dispatch_continuation_s *dispatch_continuation_t;
typedef struct dispatch_continuation_s {
	DISPATCH_CONTINUATION_HEADER(continuation);
} *dispatch_continuation_t;

它内部主要包含了dc_ctxt(也就是我们传进去的block)、dc_func(block中的函数指针)、dc_flags (标识)以及 dc_priority(优先级)。

继续看_dispatch_continuation_init

DISPATCH_ALWAYS_INLINE
/*
 dc 任务包装器
 dqu 当前队列
 work 任务
 flags/dc_flags 标识
*/
static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
		dispatch_queue_class_t dqu, dispatch_block_t work,
		dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    //block copy
	void *ctxt = _dispatch_Block_copy(work);
	dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
	if (unlikely(_dispatch_block_has_private_data(work))) {
	    //....省略
	}
    //获取block函数指针
	dispatch_function_t func = _dispatch_Block_invoke(work);
	if (dc_flags & DC_FLAG_CONSUME) {
		func = _dispatch_call_block_and_release;
	}
	return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}

/*
 dc 任务包装器
 dqu 当前队列
 ctxt block
 f block的函数指针
 flags/dc_flags 标识
 */
static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
		dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
		dispatch_block_flags_t flags, uintptr_t dc_flags)
{
	pthread_priority_t pp = 0;
    //对dc的成员进行赋值
	dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
	dc->dc_func = f;
	dc->dc_ctxt = ctxt;
	// in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
	// should not be propagated, only taken from the handler if it has one
	if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
		pp = _dispatch_priority_propagate();
	}
    //
	_dispatch_continuation_voucher_set(dc, flags);
	return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}
/*
 dc 任务包装器
 dqu 当前队列
 pp 优先级
 flags 标识
 */
static inline dispatch_qos_t
_dispatch_continuation_priority_set(dispatch_continuation_t dc,
		dispatch_queue_class_t dqu,
		pthread_priority_t pp, dispatch_block_flags_t flags)
{
	dispatch_qos_t qos = DISPATCH_QOS_UNSPECIFIED;
#if HAVE_PTHREAD_WORKQUEUE_QOS
	dispatch_queue_t dq = dqu._dq;

	if (likely(pp)) {
		bool enforce = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS);
		bool is_floor = (dq->dq_priority & DISPATCH_PRIORITY_FLAG_FLOOR);
		bool dq_has_qos = (dq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK);
		if (enforce) {
			pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
			qos = _dispatch_qos_from_pp_unsafe(pp);
		} else if (!is_floor && dq_has_qos) {
			pp = 0;
		} else {
			qos = _dispatch_qos_from_pp_unsafe(pp);
		}
	}
	dc->dc_priority = pp;
#else
	(void)dc; (void)dqu; (void)pp; (void)flags;
#endif
	return qos;
}

先总结一下以上dc的创建流程:
_dispatch_continuation_init -> _dispatch_continuation_init_f-> _dispatch_continuation_priority_set

这个流程主要是用来包装任务,将block、block的函数指针、优先级等都赋值给dc,并同时创建返回一个qos。

经过了对dc的一系列的初始化赋值,我们在进入_dispatch_continuation_async核心函数之前,就有了必要的参数:dq(当前队列)、dc(任务包装器)、qos(服务质量)。

2、_dispatch_continuation_async的调用

static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
		dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
	if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
		_dispatch_trace_item_push(dqu, dc);
	}
#else
	(void)dc_flags;
#endif
	return dx_push(dqu._dq, dc, qos);
}

其中核心的就是dx_push宏定义:

#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

dx_vtable就是根据当前队列,拿到具体的队列类型,然后去调用dq_push。这样的宏定义对于上层调用来说会比较有便利性。
直接全局搜索dq_push:

//....
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_serial, lane,
	.do_type        = DISPATCH_QUEUE_SERIAL_TYPE,
	.do_dispose     = _dispatch_lane_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_lane_invoke,

	.dq_activate    = _dispatch_lane_activate,
	.dq_wakeup      = _dispatch_lane_wakeup,
	.dq_push        = _dispatch_lane_push,
);

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
	.do_type        = DISPATCH_QUEUE_CONCURRENT_TYPE,
	.do_dispose     = _dispatch_lane_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_lane_invoke,

	.dq_activate    = _dispatch_lane_activate,
	.dq_wakeup      = _dispatch_lane_wakeup,
	.dq_push        = _dispatch_lane_concurrent_push,
);

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
	.do_type        = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
	.do_dispose     = _dispatch_object_no_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_object_no_invoke,

	.dq_activate    = _dispatch_queue_no_activate,
	.dq_wakeup      = _dispatch_root_queue_wakeup,
	.dq_push        = _dispatch_root_queue_push,
);

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_main, lane,
	.do_type        = DISPATCH_QUEUE_MAIN_TYPE,
	.do_dispose     = _dispatch_lane_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_lane_invoke,

	.dq_activate    = _dispatch_queue_no_activate,
	.dq_wakeup      = _dispatch_main_queue_wakeup,
	.dq_push        = _dispatch_main_queue_push,
);
//....

以上是列举出来的串行队列、并发队列、全局队列、主动队列。
先测一下并发队列,是否执行了_dispatch_lane_concurrent_push。

dispatch_queue_t concurrentQ = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(concurrentQ, ^{
    NSLog(@"==%@",[NSThread currentThread]);
});

下符号断点,然后打印如下:

其他队列也会执行相应的dq_push函数。


七、异步函数+并发队列

以上是异步函数的底层调用流程,那么在异步函数中线程是如何开启的以及任务是如何调用的呢?
如果异步函数传入的队列是同步队列,则会执行_dispatch_lane_push
如果异步函数传入的队列是并发队列,则会执行_dispatch_lane_concurrent_push
以(异步函数+并发队列)来继续分析:

DISPATCH_NOINLINE
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
		dispatch_qos_t qos)
{
	//...
	if (dq->dq_items_tail == NULL &&
			!_dispatch_object_is_waiter(dou) &&
			!_dispatch_object_is_barrier(dou) &&
			_dispatch_queue_try_acquire_async(dq)) {
		return _dispatch_continuation_redirect_push(dq, dou, qos);
	}
	_dispatch_lane_push(dq, dou, qos);
}

通过符号断点跟踪,异步函数会走_dispatch_continuation_redirect_push,而异步栅栏函数会走_dispatch_lane_push。我们这里先分析_dispatch_continuation_redirect_push:

DISPATCH_NOINLINE
static void
_dispatch_continuation_redirect_push(dispatch_lane_t dl,
		dispatch_object_t dou, dispatch_qos_t qos)
{
	if (likely(!_dispatch_object_is_redirection(dou))) {
		dou._dc = _dispatch_async_redirect_wrap(dl, dou);
	} else if (!dou._dc->dc_ctxt) {
		//...
		dou._dc->dc_ctxt = (void *)
		(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
	}
	dispatch_queue_t dq = dl->do_targetq;
	if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
	dx_push(dq, dou, qos);
}

可以看到在_dispatch_continuation_redirect_push函数中,又调用了dx_push,看似像递归一样。但是传入队列是原队列的do_targetq,其实就类似“类的继承链”或者“类的isa指向”一样。

那么并发队列的do_targetq是全局队列的一种,所以根据dq_push对应的全局队列类型:

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
	.do_type        = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
	.do_dispose     = _dispatch_object_no_dispose,
	.do_debug       = _dispatch_queue_debug,
	.do_invoke      = _dispatch_object_no_invoke,
	.dq_activate    = _dispatch_queue_no_activate,
	.dq_wakeup      = _dispatch_root_queue_wakeup,
	.dq_push        = _dispatch_root_queue_push,
);

会继续走 _dispatch_root_queue_push,通过符号断点调试也验证了确实会走此函数。

DISPATCH_NOINLINE
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
		dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
	//...省略
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
	if (_dispatch_root_queue_push_needs_override(rq, qos)) {
		return _dispatch_root_queue_push_override(rq, dou, qos);
	}
#else
	(void)qos;
#endif
	_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}

DISPATCH_NOINLINE
static void
_dispatch_root_queue_push_override(dispatch_queue_global_t orig_rq,
		dispatch_object_t dou, dispatch_qos_t qos)
{
	//...省略
	_dispatch_root_queue_push_inline(rq, dc, dc, 1);
}

直接来看_dispatch_root_queue_push_inline

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
		dispatch_object_t _head, dispatch_object_t _tail, int n)
{
	struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
	if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
		return _dispatch_root_queue_poke(dq, n, 0);
	}
}

继续来看_dispatch_root_queue_poke:

DISPATCH_NOINLINE
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
    //...省略
	return _dispatch_root_queue_poke_slow(dq, n, floor);
}

核心代码:

DISPATCH_NOINLINE
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    int remaining = n;
    int r = ENOSYS;
    //初始化队列
    _dispatch_root_queues_init();
    //...省略
#if DISPATCH_USE_PTHREAD_POOL
    //...省略
    int can_request, t_count;
    // seq_cst with atomic store to tail <rdar://problem/16932833>
    //根据线程池大小来处理,判断需要创建多少条线程
    t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
    do {
        can_request = t_count < floor ? 0 : t_count - floor;
        if (remaining > can_request) {
            _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
                    remaining, can_request);
            os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
            remaining = can_request;
        }
        if (remaining == 0) {
            _dispatch_root_queue_debug("pthread pool is full for root queue: "
                    "%p", dq);
            return;
        }
    } while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
            t_count - remaining, &t_count, acquire));

    //...省略
    //循环创建线程
    do {
        _dispatch_retain(dq); // released in _dispatch_worker_thread
        while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
            if (r != EAGAIN) {
                (void)dispatch_assume_zero(r);
            }
            _dispatch_temporary_resource_shortage();
        }
    } while (--remaining);
//...省略
}

可以看到在_dispatch_root_queue_poke_slow函数中,有根据线程池来判断创建线程的逻辑,此时
(异步函数+并发队列)创建线程的流程就知道了。但是异步函数的任务是如何调度的呢?
视角转向上层,打印block的调用堆栈信息:


可以看到libsystem_pthread与libdispatch发生了交互,之后依次调用了:
_dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_continuation_pop -> _dispatch_client_callout -> _dispatch_call_block_and_release
先全局搜索一下_dispatch_worker_thread2,发现它是在_dispatch_root_queues_init_once函数中有使用:

static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
    //...省略
    #if DISPATCH_USE_INTERNAL_WORKQUEUE
        size_t i;
        //初始化线程池
        for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
            _dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
                    _dispatch_root_queues[i].dq_priority);
        }
    #else
    //...省略
    //线程任务配置
    struct pthread_workqueue_config cfg = {
        .version = PTHREAD_WORKQUEUE_CONFIG_VERSION,
        .flags = 0,
        .workq_cb = 0,
        .kevent_cb = 0,
        .workloop_cb = 0,
        .queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum,
        //...省略
    };
    //...省略
    //指定任务回调函数
        cfg.workq_cb = _dispatch_worker_thread2;
        cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
        cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
        r = pthread_workqueue_setup(&cfg, sizeof(cfg));
    //...省略
}

其中会进行线程池的初始化、线程任务配置、指定任务回调的函数以及通过底层的工作循环(workloop)去进行gcd内核通信。
而_dispatch_root_queues_init_once函数在_dispatch_root_queues_init中,是一个单利方法

static inline void
_dispatch_root_queues_init(void)
{
	dispatch_once_f(&_dispatch_root_queues_pred, NULL,
			_dispatch_root_queues_init_once);
}

_dispatch_root_queues_init函数是_dispatch_root_queue_poke_slow中被调用的,所以当执行_dispatch_root_queue_poke_slow时,_dispatch_worker_thread2就会被绑定为线程回调函数。
知道了_dispatch_worker_thread2如何被调用之后,继续来看 _dispatch_continuation_pop

DISPATCH_NOINLINE
void
_dispatch_continuation_pop(dispatch_object_t dou, dispatch_invoke_context_t dic,
        dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu)
{
    _dispatch_continuation_pop_inline(dou, dic, flags, dqu._dq);
}

static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_queue_class_t dqu)
{
    //拦截便于调试
    dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
            _dispatch_get_pthread_root_queue_observer_hooks();
    if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq);
    flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
    //核心判断 
    if (_dispatch_object_has_vtable(dou)) {
        dx_invoke(dou._dq, dic, flags);
    } else {
        _dispatch_continuation_invoke_inline(dou, flags, dqu);
    }
    if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq);
}

以上代码简单的分析一下(具体的分析必要性不太大(对自己而言)),其中dx_invoke和dx_push一样,也是根据不同的队列有相应的执行函数。
接下来看_dispatch_continuation_invoke_inline内,会发现调用了_dispatch_client_callout

void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    _dispatch_get_tsd_base();
    void *u = _dispatch_get_unwind_tsd();
    if (likely(!u)) return f(ctxt);
    _dispatch_set_unwind_tsd(NULL);
    f(ctxt);
    _dispatch_free_unwind_tsd();
    _dispatch_set_unwind_tsd(u);
}

核心就是f(xtxt),进行了block的调用,而此时的f也就是在_dispatch_continuation_init中赋值给func的_dispatch_call_block_and_release。

异步函数整体总结:

(1)调用_dispatch_continuation_init 来初始化包装任务
(2)由_dispatch_continuation_async走到dx_push,dx_push会根据队列执行不同的dp_push(对应的函数)。
(3)以并发队列来分析,_dispatch_lane_concurrent_push -> _dispatch_continuation_redirect_push 会开启线程,同时在_dispatch_root_queues_init中会去绑定线程回调函数_dispatch_worker_thread2
(4)通过堆栈分析 _dispatch_worker_thread2 -> _dispatch_root_queue_drain -> _dispatch_continuation_pop -> _dispatch_client_callout -> _dispatch_call_block_and_release
最终会通过_dispatch_client_callout去执行上层的block。