技术

agentic chat 图数据库的一些考量 LLM一些探索 Agent实践 LLM预训练 向量数据库的一些考量 fastapi+sqlalchemy进行项目开发 LLM微调实践 Python协程实现 Agent Functon Calling LLamaIndex入门 Multi-Agent探索 Python虚拟机 LLM工作流编排 Python实践 下一个平台Agent 激发LLM涌现——提示工程 LLM微调理论 大佬沉思 LLM外挂知识库 LLMOps 多模态LLM Python一些比较有意思的库 Transformers源码学习 LangChain源码学习 通用分布式计算引擎Ray Python并发 go依赖注入 go collection gc的基本原理 golang性能分析及优化 数据湖 高性能计算与存储 Linux2.1.13网络源代码学习 《大数据经典论文解读》 三驾马车学习 Spark 内存管理及调优 Yarn学习 从Spark部署模式开始讲源码分析 容器狂占内存资源怎么办? 多角度理解一致性 golang io使用及优化模式 Flink学习 c++学习 学习ebpf go设计哲学 ceph学习 学习mesh kvm虚拟化 学习MQ go编译器以及defer实现 学习go 为什么要有堆栈 汇编语言 计算机组成原理 运行时和库 Prometheus client mysql 事务 mysql 事务的隔离级别 mysql 索引 坏味道 学习分布式 学习网络 学习Linux go堆内存分配 golang 系统调用与阻塞处理 Goroutine 调度过程 重新认识cpu mosn有的没的 负载均衡泛谈 单元测试的新解读 《Redis核心技术与实现》笔记 《Prometheus监控实战》笔记 Prometheus 告警学习 calico源码分析 对容器云平台的理解 Prometheus 源码分析 并发的成本 基础设施优化 hashicorp raft源码学习 docker 架构 mosn细节 与微服务框架整合 Java动态代理 编程范式 并发通信模型 《网络是怎样连接的》笔记 go channel codereview gc分析 jvm 线程实现 go打包机制 go interface及反射 如何学习Kubernetes 《编译原理之美》笔记——后端部分 《编译原理之美》笔记——前端部分 Pilot MCP协议分析 go gc 内存管理玩法汇总 软件机制 istio流量管理 Pilot源码分析 golang io 学习Spring mosn源码浅析 MOSN简介 《datacenter as a computer》笔记 学习JVM Tomcat源码分析 Linux可观测性 学习存储 学计算 Gotty源码分析 kubernetes operator kaggle泰坦尼克问题实践 kubernetes扩缩容 神经网络模型优化 直觉上理解深度学习 如何学习机器学习 TIDB源码分析 什么是云原生 Alibaba Java诊断工具Arthas TIDB存储——TIKV 《Apache Kafka源码分析》——简介 netty中的线程池 guava cache 源码分析 Springboot 启动过程分析 Spring 创建Bean的年代变迁 Linux内存管理 自定义CNI IPAM 共识算法 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 docker和k8s安全访问机制 jvm crash分析 Prometheus 学习 Kubernetes监控 Kubernetes 控制器模型 容器日志采集 容器狂占资源怎么办? Kubernetes资源调度——scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes类型系统 源码分析体会 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 线程排队 jib源码分析之细节 跨主机容器通信 jib源码分析及应用 为容器选择一个合适的entrypoint kubernetes yaml配置 《持续交付36讲》笔记 mybatis学习 程序猿应该知道的 无锁数据结构和算法 CNI——容器网络是如何打通的 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 精简代码的利器——lombok 学习 《深入剖析kubernetes》笔记 编程语言那些事儿 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 如何分发计算 Storm 学习 AQS1——论文学习 Unsafe Spark Stream 学习 linux vfs轮廓 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 hbase 泛谈 forkjoin 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 bgp初识 calico学习 AQS——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 《Elasticsearch权威指南》笔记 mockito简介及源码分析 2017软件开发小结—— 从做功能到做系统 《Apache Kafka源码分析》——clients dns隐藏的一个坑 《mysql技术内幕》笔记 log4j学习 为什么netty比较难懂? 递归、回溯、动态规划 apollo client源码分析及看待面向对象设计 学习并发 docker运行java项目的常见问题 OpenTSDB 入门 spring事务小结 分布式事务 javascript应用在哪里 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 java内存模型和jvm内存布局 java exception Linux IO学习 netty内存管理 测试环境docker化实践 netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 Go并发机制及语言层工具 Linux网络源代码学习——数据包的发送与接收 《docker源码分析》小结 docker namespace和cgroup zookeeper三重奏 数据库的一些知识 Spark 泛谈 链式处理的那些套路 netty回顾 Thrift基本原理与实践(二) Thrift基本原理与实践(一) 回调 异步执行抽象——Executor与Future Docker0.1.0源码分析 java gc Jedis源码分析 深度学习泛谈 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 向Hadoop学习NIO的使用 以新的角度看数据结构 并发控制相关的硬件与内核支持 systemd 简介 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM执行 git maven/ant/gradle/make使用 再看tcp kv系统 java nio的多线程扩展 《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go常用的一些库 Python初学 Goroutine 调度模型 虚拟网络 《程序员的自我修养》小结 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件 Go基础 JVM类加载 硬币和扑克牌问题 LRU实现 virtualbox 使用 ThreadLocal小结 docker快速入门

架构

bert rerank微调 大模型推理tips RAG向量检索与微调 dddfirework源码分析 RAG与知识图谱 大模型推理服务框架vLLM 大模型推理服务框架 模型服务化(未完成) 大模型Post-Training 大模型训练 大模型推理 从Attention到Transformer k8s设备管理 ddd从理念到代码 如何应用LLM 小鼠如何驾驭大象(LLM)? 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 kubevela源码分析 容器和CPU那些事儿 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 AutoML和AutoDL 特征平台 实时训练 分布式链路追踪 K8S YAML 资源清单管理方案 tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps 推荐系统embedding原理及实践 机器学习中的python调用c 机器学习训练框架概述 tensornet源码分析 大模型训练和推理 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 从混部到统一调度 从RNN到Attention pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 多活 volcano特性源码分析 推理服务 kubebuilder 学习 mpi 学习pytorch client-go学习 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台梳理 tensorflow学习 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 学习rpc BFF openkruise学习 可观察性和监控系统 基于Kubernetes选主及应用 《许式伟的架构课》笔记 Admission Controller 与 Admission Webhook 发布平台系统设计 k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? 《软件设计之美》笔记 mecha 架构学习 Kubernetes events学习及应用 CRI——kubelet与容器引擎之间的接口 资源调度泛谈 业务系统设计原则 grpc学习 元编程 以应用为中心 istio学习 下一代微服务Service Mesh 《实现领域驱动设计》笔记 概率论 serverless 泛谈 《架构整洁之道》笔记 处理复杂性 那些年追过的并发 服务器端编程 网络通信协议 架构大杂烩 如何学习架构 《反应式设计模式》笔记 项目的演化特点 反应式架构摸索 函数式编程的设计模式 服务化 ddd反模式——CRUD的败笔 研发效能平台 重新看面向对象设计 业务系统设计的一些体会 函数式编程 《左耳听风》笔记 业务程序猿眼中的微服务管理 DDD实践——CQRS 项目隔离——案例研究 《编程的本质》笔记 系统故障排查汇总及教训 平台支持类系统的几个点 代码腾挪的艺术 abtest 系统设计汇总 《从0开始学架构》笔记 初级权限系统设计 领域驱动理念 现有上传协议分析 移动网络下的文件上传要注意的几个问题 推送系统的几个基本问题 做配置中心要想好的几个基本问题 不同层面的异步 分层那些事儿 性能问题分析 用户认证问题 资源的分配与回收——池 消息/任务队列

标签

k8s设备管理 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 kubevela源码分析 容器和CPU那些事儿 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 K8S YAML 资源清单管理方案 从混部到统一调度 volcano特性源码分析 kubebuilder 学习 client-go学习 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 openkruise学习 基于Kubernetes选主及应用 Admission Controller 与 Admission Webhook k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? Kubernetes events学习及应用 CRI——kubelet与容器引擎之间的接口 资源调度泛谈 如何学习Kubernetes 以应用为中心 kubernetes operator kubernetes扩缩容 serverless 泛谈 什么是云原生 自定义CNI IPAM docker和k8s安全访问机制 Kubernetes监控 Kubernetes 控制器模型 Kubernetes资源调度——scheduler Kubernetes类型系统 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 Kubernetes源码分析——从kubectl开始 kubernetes yaml配置 CNI——容器网络是如何打通的 当我在说PaaS时,我在说什么 《深入剖析kubernetes》笔记 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件
agentic chat bert rerank微调 大模型推理tips LLM一些探索 Agent实践 LLM预训练 RAG向量检索与微调 LLM微调实践 RAG与知识图谱 大模型推理服务框架vLLM Agent Functon Calling LLamaIndex入门 Multi-Agent探索 LLM工作流编排 大模型推理服务框架 模型服务化(未完成) 大模型Post-Training 大模型训练 大模型推理 从Attention到Transformer 下一个平台Agent 激发LLM涌现——提示工程 LLM微调理论 大佬沉思 LLM外挂知识库 LLMOps 多模态LLM Transformers源码学习 LangChain源码学习 如何应用LLM 小鼠如何驾驭大象(LLM)? AutoML和AutoDL 特征平台 实时训练 tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps 推荐系统embedding原理及实践 机器学习中的python调用c 机器学习训练框架概述 tensornet源码分析 大模型训练和推理 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 从RNN到Attention pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 推理服务 mpi 学习pytorch 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台梳理 tensorflow学习 kaggle泰坦尼克问题实践 神经网络模型优化 概率论 直觉上理解深度学习 如何学习机器学习 深度学习泛谈

go channel

2020年03月27日

前言

Go 对并发的原生支持可不是仅仅停留在口号上的,Go 在语法层面将并发原语 channel 作为一等公民对待。这意味着我们可以像使用普通变量那样使用 channel,比如,定义 channel 类型变量、给 channel 变量赋值、将 channel 作为参数传递给函数 / 方法、将 channel 作为返回值从函数 / 方法中返回,甚至将 channel 发送到其他 channel 中。这就大大简化了 channel 原语的使用,提升了我们开发者在做并发设计和实现时的体验。

通信原语Go 语言中 Channel 与 Select 语句受到 1978 年 CSP 原始理论的启发。 语言设计中,Goroutine 就是 CSP 理论中的并发实体, 而 Channel 则对应 CSP 中输入输出指令的消息信道,Select 语句则是 CSP 中守卫和选择指令的组合。Channel 与 Select 是 Go 语言中提供的语言级的、基于消息传递的同步原语。

深度解密 Go 语言之 channel 未读。

channel

channel是一个单向的、大小有限的管道,在goroutine之间传输类型化的信息。Go还提供了一个多路选择原语select,可以根据某channel上的通信是否可进行来控制执行。Go有提供互斥、条件变量、信号量和原子操作的库,供低级别互斥或同步使用,但channel往往是更好的选择。根据我们的经验,人们对消息传递–利用通信在goroutine之间转移所有权–的理解比对互斥和条件变量的理解更容易、更正确。早期流行的一句Go箴言是:”不要通过共享内存来通信,而是通过通信来共享内存”。

和切片、结构体、map 等一样,channel 也是一种复合数据类型。也就是说,我们在声明一个 channel 类型变量时,必须给出其具体的元素类型,比如var ch chan int。 使用操作符<-,我们还可以声明只发送 channel 类型(send-only)和只接收 channel 类型(recv-only)

使用操作符<-,我们还可以声明只发送 channel 类型(send-only)和只接收 channel 类型(recv-only)

ch1 := make(chan<- int, 1) // 只发送channel类型
ch2 := make(<-chan int, 1) // 只接收channel类型
<-ch1       // invalid operation: <-ch1 (receive from send-only type chan<- int)
ch2 <- 13   // invalid operation: ch2 <- 13 (send to receive-only type <-chan int)

数据结构

Go 语言设计与实现-ChannelChannel 在运行时的内部表示是 runtime.hchan,本质上就是一个 mutex 锁加上一个环状缓存、 一个发送方队列和一个接收方队列。社区有一些无锁Channel 的提案,但还在不停的优化中。PS:本质上是有锁队列,还是共享内存

// src/runtime/chan.go
type hchan struct {
	qcount   uint           // Channel 中的元素个数;
	dataqsiz uint           // Channel 中的循环队列的长度;
	buf      unsafe.Pointer // Channel 的缓冲区数据指针;指向大小为 dataqsiz 的数组
	elemsize uint16         // 当前 Channel 能够收发的元素大小
	closed   uint32
	elemtype *_type         // 当前 Channel 能够收发的元素类型
	sendx    uint           // Channel 的发送操作处理到的位置;
    recvx    uint           // Channel 的接收操作处理到的位置;
	// 就像linux socket的 等待队列一样,方便根据channel找到goroutine
	recvq    waitq          //  recv 等待列表,即( <-ch )
	sendq    waitq			//  send 等待列表,即( ch<- )

	lock mutex
}
type waitq struct { // 等待队列 sudog 双向队列
	first *sudog
	last  *sudog
}

make(chan type, n) => makechan(type, n),makechan 实现的本质是根据需要创建的元素大小, 对 mallocgc 进行封装,因此,Channel 总是在堆上进行分配,它们会被垃圾回收器进行回收, 这也是为什么 Channel 不一定总是需要调用 close(ch) 进行显式地关闭。

发送数据

当我们想要向 Channel 发送数据时,就需要使用 ch <- i 语句,编译器会经过一系列处理后调用runtime.chansend,这个函数负责了发送数据的全部逻辑,如果我们在调用时将 block 参数设置成 true,那么就表示当前发送操作是一个阻塞操作

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 异常检查,如GC检查等
	...
    lock(&c.lock)	// 为当前 Channel 加锁,保证线程安全
    // 如果 Channel 已经关闭,那么向该 Channel 发送数据时会抛出panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
    }
    // 如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend 函数会从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据
    if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
    }
    // 如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,将发送的数据写入 Channel 的缓冲区;
    if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
    }
    // 当 Channel 没有接收者能够处理数据时,向 Channel 发送数据就会被下游阻塞
    if !block {
		unlock(&c.lock)
		return false
	}
	// 阻塞在 channel 上,等待接收方接收数据
	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	mysg.g = gp
	mysg.c = c
	gp.waiting = mysg
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)    // 触发 Goroutine 让出处理器的使用权
	// 因为调度器在停止当前 g 的时候会记录运行现场,当恢复阻塞的发送操作时候,会从此处继续开始执行
	gp.waiting = nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

发送数据时会调用 runtime.send,该函数的执行可以分成两个部分:

  1. 调用 runtime.sendDirect 函数将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上;其实是一种优化,原因在于,已经处于等待状态的 Goroutine 是没有被执行的, 因此用户态代码不会与当前所发生数据发生任何竞争。我们也更没有必要冗余的将数据写入到缓存, 再让接收方从缓存中进行读取。因此我们可以看到, sendDirect 的调用, 本质上是将数据直接写入接收方的执行栈。
  2. 调用 runtime.goready 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时就会立刻唤醒数据的接收方;

接收数据

i <- ch 						// 当ch被关闭后,n将被赋值为ch元素类型的零值, 无法判定从 ch 返回的元素类型零值,究竟是不是因为 channel 被关闭后才返回的。
i, ok <- ch						// 当ch被关闭后,m将被赋值为ch元素类型的零值, ok值为false
for v := range ch { ... ...}	// 当ch被关闭后,for range循环结束

不同的接收方式会被转换成 runtime.chanrecv1runtime.chanrecv2 两种不同函数的调用,但是这两个函数最终还是会调用 runtime.chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 当我们从一个空 Channel 接收数据时会直接调用 runtime.gopark 直接让出处理器的使用权
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
    lock(&c.lock)
    // 如果当前 Channel 已经被关闭并且缓冲区中不存在任何的数据,那么就会清除 ep 指针中的数据并立刻返回。
	if c.closed != 0 && c.qcount == 0 {
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
    }
    // 当存在等待的发送者时,通过 runtime.recv 直接从阻塞的发送者或者缓冲区中获取数据;
    if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
    }
    // 当Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 recvx 的索引位置中取出数据进行处理
    if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		return true, true
    }
    // 当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞操作,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作
    if !block {
		unlock(&c.lock)
		return false, false
	}
	// 没有数据可以接收,阻塞当前 Goroutine
	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	gp.waiting = mysg
	mysg.g = gp
	mysg.c = c
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
	// 被唤醒
	gp.waiting = nil
	closed := gp.param == nil
	gp.param = nil
	releaseSudog(mysg)
    return true, !closed
}

关闭

  • 关闭未初始化的channel(nil channel)会panic
  • 重复关闭同一channel会panic
  • 向已关闭channel发送消息会panic
  • 从已关闭channel读取数据,不会panic
    • 若还存在数据,会读出未被读取的消息
    • 若无数据,会读出对应数据类型的零值
  • 关闭channel时,所有阻塞在此channel上的接收者,都会立刻从阻塞等待中返回,可以利用此特性,实现广播机制
  • channel也可以使用for-range取值,并且会一直从channel中读取数据,因此不主动break/goto/return的情况下,for循环结束的唯一条件是channel被关闭

编译器会将用于关闭管道的 close 关键字转换成 OCLOSE 节点以及 runtime.closechan 的函数调用。该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。PS:所以close 也成了一种 通知的方式。

select

当涉及同时对多个 channel 进行发送 / 接收操作时, Go 为 CSP 并发模型提供的另外一个原语 select一起使用。

func fibonacci(c, quit chan int) {
	x, y := 0, 1
	for {
		select {
		case c <- x:
			x, y = y, x+y
		case <-quit:
			fmt.Println("quit")
            return
        default:
		    println("default")
			break   // select 中的break 只对select 有效,实际上根本无用,无法跳出for 循环
		}
	}
}
  1. select 是一种与 switch 相似的控制结构,与 switch 不同的是,select 中虽然也有多个 case,但是这些 case 中的表达式必须都是 Channel 的收发操作。
  2. 上述控制结构会等待 c <- x 或者 <-quit 两个表达式中任意一个的返回。无论哪一个表达式返回都会立刻执行 case 中的代码,当 select 中的两个 case 同时被触发时,就会随机选择一个 case 执行。
  3. 非阻塞的 Channel :比如,我们只是想看看 Channel 的可读或者可写状态,不希望Channel收发阻塞当前 Goroutine。此时可以为select 添加default 分支,当某次循环 不存在可以收发的 Channel 时,会直接执行 default 中的代码并返回
  4. 实现超时机制
     func worker() {
         select {
         case <-c:
             // ... do some stuff
         case <-time.After(30 *time.Second):
             return
         }
     }
    
  5. 实现心跳
     func worker() {
         heartbeat := time.NewTicker(30 * time.Second)
         defer heartbeat.Stop()
         for {
             select {
             case <-c:
             // ... do some stuff
             case <- heartbeat.C:
             //... do heartbeat stuff
             }
         }
     }
    

数据结构

type scase struct {
    c           *hchan         // 正在操作的channel
    elem        unsafe.Pointer // data element
    kind        uint16
      // ...
}
const (
    caseNil = iota
    caseRecv
    caseSend
    caseDefault
)

实现过程

面向信仰编程-selectC 语言中的 select 关键字可以同时监听多个文件描述符的可读或者可写的状态,Go 语言中的 select 关键字也能够让 Goroutine 同时等待多个 Channel 的可读或者可写,在多个文件或者 Channel 发生状态改变之前,select 会一直阻塞当前线程或者 Goroutine。

runtime 通过遍历+等待的方式实现 select 语义,遍历时判断如果 有可执行的 case 或者 select 中带有 default,那么就执行之。如果没有,就通过 gopark 将调用者转换为等待状态,使用 sudog 链表表示它在多个通道上等待。其中任意一个通道对应的 sudog 都可以唤醒调用者。

与 Channel 同步出现的 Select 更像是一个语法糖, 其本质仍然是一个 chansend 和 chanrecv 的两个通用实现。 但为了支持 Select 在不同分支上的非阻塞操作,selectgo 完成了这一需求。func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) 它的第一个返回值表示需要执行哪个 case, 第 2 个返回值表示如果要执行的 case 是 caseRecv,那么接收数据是否成功。

Select 本身会被编译为 selectgo 调用。这与普通的多个 if 分支不同。 selectgo 则用于随机化每条分支的执行顺序,普通多个 if 分支的执行顺序始终是一致的。编译器会特殊处理 当 Select 语句只有一个分支的情况,即 select 关键字在只有一个分支时,没有被翻译成 selectgo。 只有一个分支的情况下,select 与 if 是没有区别的,这种优化消除了只有一个分支情况下调用 selectgo 的性能开销。

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool){
	...
	// 先遍历一次所有的 case 和 default 语句,看一下是否有可执行的分支,如果有,那么就转移到对应的段去处理。否则就阻塞并且等待被唤醒。
	loop:
		for i := 0; i < ncases; i++ {
			casi = int(pollorder[i])  // pollorder 是伪随机数
        	cas = &scases[casi]
			switch cas.kind {
				case caseNil:
				case caseRecv:
				case caseSend:
				case caseDefault:
			}
		}
	recv:
	bufrecv:
	rclose:
	sclose:
	send:
	bufsend:
	...
	// 上面的流程都执行完了,并且没有default语句,还没有 goto 出去,说明没有任何 case 当前可以执行。那么就挂起并等待被唤醒。
	// 按照锁顺序一次遍历每个 case,然后将其放到 g.waitlink 这个 sudog 链表中,表明是在等待多个 case , 并将当前g 挂到channel的 recvq/sendq中
	gp = getg()
	nextp = &gp.waiting
	for _, casei := range lockorder {
		cas = &scases[casi]
		sg := acquireSudog()
		sg.g = gp
		switch cas.kind {
		case caseRecv:
			c.recvq.enqueue(sg)
		case caseSend:
			c.sendq.enqueue(sg)
		}
	}
	gp.param = nil
	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	...
	// 说明被某个 channel 唤醒了
}

背景知识——sudog

在 g 对象中,有一个名字为 waiting 的 sudog* 指针,它表示这个 goroutine 正在等待什么东西或者正在等待哪些东西。sudog 是一个链表形式的类型,waitlink 表示它的下一个节点。

type g struct {
  // ...
  atomicstatus   uint32  // 表示 goroutine 的状态
  param          unsafe.Pointer // 唤醒时参数
  waiting        *sudog // 等待队列,goroutine ready之前,会waiting做一些检查,waiting不为空是不能运行的。
  // ...
}
// sudog is necessary because the g ↔ synchronization object relation is many-to-many. 
type sudog struct {
	// ....
	isSelect bool
	elem     unsafe.Pointer // data element (may point to stack)      
	waitlink    *sudog // g.waiting list or semaRoot
	c           *hchan // channel
	g 			*g
}
// sudogs are allocated from a special pool. Use acquireSudog and releaseSudog to allocate and free them.
func acquireSudog() *sudog {}
func releaseSudog(s *sudog) {}
  1. gopark 将当前的 goroutine 修改成等待状态,然后等待被唤醒。
  2. goready 函数用来唤醒一个 goroutine,它将 goroutine 的状态修改为可运行状态,随后会被调度器运行。
  3. 当被调度时,对应的 gopark 函数返回。

应用

  1. channel 事先创建好
    1. 先启动 N 个 goroutine 消费者,读某个 channel,之后,生产者再在某个时候向 channel 中发送元素
    2. 传递信号,比如用 channel 充当一个 “ready” 的信号,用来指示某个“过程”准备好了,可以接收结果了
  2. 临时创建,就像java 中的future 一样Go channel 的妙用

空结构体类型变量的内存占用为 0。基于空结构体类型内存零开销这样的特性,我们在日常 Go 开发中会经常使用空结构体类型元素,作为一种“事件”信息进行 Goroutine 之间的通信

var c = make(chan struct{}) // 声明一个元素类型为Empty的channel
c<-struct{}               	// 向channel写入一个“事件”

带缓冲 channel 的惯用法

  1. 用作消息队列
  2. 用作计数信号量

一文带你搞懂从单队列到优先级队列的实现

使用无缓冲 channel 替代锁

这种并发设计逻辑更符合 Go 语言所倡导的“不要通过共享内存来通信,而是通过通信来共享内存”的原则。PS: 好像没有使用锁好懂

type counter struct {
    c chan int
    i int
}
func NewCounter() *counter {
    cter := &counter{
        c: make(chan int),
    }
    go func() {	// 计数生产者
        for {
            cter.i++
            cter.c <- cter.i
        }
    }()
    return cter
}
func (cter *counter) Increase() int {
    return <-cter.c
}
func main() {
    cter := NewCounter()
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            v := cter.Increase()
            fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

广播channel

channels在多个writer,一个reader的模型下面工作的很好,但是却不能很容易的处理多个reader等待获取一个writer发送的数据的情况。处理这样的情况,可能的一个go api原型如下:

type Broadcaster interface{
    func NewBroadcaster() Broadcaster
    func (b Broadcaster) Write(v interface{})
    func (b Broadcaster) Listen() chan interface{}
}

broadcast channel通过NewBroadcaster创建,通过Write函数进行数据广播。为了监听这个channel的信息,我们使用Listen,该函数返回一个新的channel去接受Write发送的数据。这套解决方案需要一个中间process用来处理所有reader的注册。当调用Listen创建新的channel之后,该channel就被注册,通常该中间process的主循环如下:

for {
    select {
        case v := <-inc:
            for _, c := range(listeners) {
                c <- v
            }
        case c := <- registeryc:
            listeners.push(c)
    }
}

这是一个通常的做法,但是该process在处理数据广播的时候会阻塞,直到所有的readers读取到值。一个可选的解决方式就是reader的channel是有buffer缓冲的,缓冲大小我们可以按需调节。或者当buffer满的时候我们将数据丢弃。

SierraSoftworks/multicast 解决了这个问题,The multicast module provides single-writer, multiple-reader semantics around Go channels. It attempts to maintain semantics similar to those offered by standard Go channels while guaranteeing parallel delivery (slow consumers won’t hold up delivery to other listeners) and guaranteeing delivery to all registered listeners when a message is published.

示例代码

import (
    "fmt"
    "github.com/SierraSoftworks/multicast"
)
func main() {
    c := multicast.New()
	go func() {
		l := c.Listen()
		for msg := range l.C {
			fmt.Printf("Listener 1: %s\n", msg)
		}
        fmt.Println("Listener 1 Closed")
	}()
	go func() {
		l := c.Listen()
		for msg := range l.C {
			fmt.Printf("Listener 2: %s\n", msg)
		}
        fmt.Println("Listener 2 Closed")
	}()
    // 据笔者实践,此处最好加上  time.Sleep(1000)
	c.C <- "Hello World!"
	c.Close()
}