技术

Python并发 Agent Functon Calling LLamaIndex入门 Multi-Agent探索 Python虚拟机 LLM工作流编排 Python实践 下一个平台Agent 激发LLM涌现——提示工程 LLM微调理论及实践 大佬沉思 LLM外挂知识库 LLMOps 多模态LLM Python一些比较有意思的库 Transformers源码学习 LangChain源码学习 通用分布式计算引擎Ray Python并发 go依赖注入 go collection gc的基本原理 golang性能分析及优化 数据湖 高性能计算与存储 Linux2.1.13网络源代码学习 《大数据经典论文解读》 三驾马车学习 Spark 内存管理及调优 Yarn学习 从Spark部署模式开始讲源码分析 容器狂占内存资源怎么办? 多角度理解一致性 golang io使用及优化模式 Flink学习 c++学习 学习ebpf go设计哲学 ceph学习 学习mesh kvm虚拟化 学习MQ go编译器以及defer实现 学习go 为什么要有堆栈 汇编语言 计算机组成原理 运行时和库 Prometheus client mysql 事务 mysql 事务的隔离级别 mysql 索引 坏味道 学习分布式 学习网络 学习Linux go堆内存分配 golang 系统调用与阻塞处理 Goroutine 调度过程 重新认识cpu mosn有的没的 负载均衡泛谈 单元测试的新解读 《Redis核心技术与实现》笔记 《Prometheus监控实战》笔记 Prometheus 告警学习 calico源码分析 对容器云平台的理解 Prometheus 源码分析 并发的成本 基础设施优化 hashicorp raft源码学习 docker 架构 mosn细节 与微服务框架整合 Java动态代理 编程范式 并发通信模型 《网络是怎样连接的》笔记 go channel codereview gc分析 jvm 线程实现 go打包机制 go interface及反射 如何学习Kubernetes 《编译原理之美》笔记——后端部分 《编译原理之美》笔记——前端部分 Pilot MCP协议分析 go gc 内存管理玩法汇总 软件机制 istio流量管理 Pilot源码分析 golang io 学习Spring mosn源码浅析 MOSN简介 《datacenter as a computer》笔记 学习JVM Tomcat源码分析 Linux可观测性 学习存储 学计算 Gotty源码分析 kubernetes operator kaggle泰坦尼克问题实践 kubernetes扩缩容 神经网络模型优化 直觉上理解深度学习 如何学习机器学习 TIDB源码分析 什么是云原生 Alibaba Java诊断工具Arthas TIDB存储——TIKV 《Apache Kafka源码分析》——简介 netty中的线程池 guava cache 源码分析 Springboot 启动过程分析 Spring 创建Bean的年代变迁 Linux内存管理 自定义CNI IPAM 共识算法 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 docker和k8s安全访问机制 jvm crash分析 Prometheus 学习 Kubernetes监控 Kubernetes 控制器模型 容器日志采集 容器狂占资源怎么办? Kubernetes资源调度——scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes类型系统 源码分析体会 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 线程排队 jib源码分析之细节 跨主机容器通信 jib源码分析及应用 为容器选择一个合适的entrypoint kubernetes yaml配置 《持续交付36讲》笔记 mybatis学习 程序猿应该知道的 无锁数据结构和算法 CNI——容器网络是如何打通的 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 精简代码的利器——lombok 学习 《深入剖析kubernetes》笔记 编程语言那些事儿 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 如何分发计算 Storm 学习 AQS1——论文学习 Unsafe Spark Stream 学习 linux vfs轮廓 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 hbase 泛谈 forkjoin 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 bgp初识 calico学习 AQS——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 《Elasticsearch权威指南》笔记 mockito简介及源码分析 2017软件开发小结—— 从做功能到做系统 《Apache Kafka源码分析》——clients dns隐藏的一个坑 《mysql技术内幕》笔记 log4j学习 为什么netty比较难懂? 递归、回溯、动态规划 apollo client源码分析及看待面向对象设计 学习并发 docker运行java项目的常见问题 OpenTSDB 入门 spring事务小结 分布式事务 javascript应用在哪里 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 java内存模型和jvm内存布局 java exception Linux IO学习 netty内存管理 测试环境docker化实践 netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 Go并发机制及语言层工具 Linux网络源代码学习——数据包的发送与接收 《docker源码分析》小结 docker namespace和cgroup zookeeper三重奏 数据库的一些知识 Spark 泛谈 链式处理的那些套路 netty回顾 Thrift基本原理与实践(二) Thrift基本原理与实践(一) 回调 异步执行抽象——Executor与Future Docker0.1.0源码分析 java gc Jedis源码分析 深度学习泛谈 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 向Hadoop学习NIO的使用 以新的角度看数据结构 并发控制相关的硬件与内核支持 systemd 简介 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM执行 git maven/ant/gradle/make使用 再看tcp kv系统 java nio的多线程扩展 《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go常用的一些库 Python初学 Goroutine 调度模型 虚拟网络 《程序员的自我修养》小结 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件 Go基础 JVM类加载 硬币和扑克牌问题 LRU实现 virtualbox 使用 ThreadLocal小结 docker快速入门

架构

dddfirework源码分析 RAG与知识图谱 大模型推理服务框架vLLM 大模型推理服务框架 模型服务化(未完成) 大模型RHLF 大模型训练 大模型推理 从Attention到Transformer k8s设备管理 LLM工具栈 ddd从理念到代码 如何应用LLM 小鼠如何驾驭大象(LLM)? 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 kubevela源码分析 容器和CPU那些事儿 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 AutoML和AutoDL 特征平台 实时训练 分布式链路追踪 helm tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps embedding的原理及实践 机器学习中的python调用c 机器学习训练框架概述 tensornet源码分析 大模型训练和推理 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 从混部到统一调度 从RNN到Attention pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 多活 volcano特性源码分析 推理服务 kubebuilder 学习 mpi 学习pytorch client-go学习 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台梳理 tensorflow学习 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 学习rpc BFF openkruise学习 可观察性和监控系统 基于Kubernetes选主及应用 《许式伟的架构课》笔记 Admission Controller 与 Admission Webhook 发布平台系统设计 k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? 《软件设计之美》笔记 mecha 架构学习 Kubernetes events学习及应用 CRI 资源调度泛谈 业务系统设计原则 grpc学习 元编程 以应用为中心 istio学习 下一代微服务Service Mesh 《实现领域驱动设计》笔记 概率论 serverless 泛谈 《架构整洁之道》笔记 处理复杂性 那些年追过的并发 服务器端编程 网络通信协议 架构大杂烩 如何学习架构 《反应式设计模式》笔记 项目的演化特点 反应式架构摸索 函数式编程的设计模式 服务化 ddd反模式——CRUD的败笔 研发效能平台 重新看面向对象设计 业务系统设计的一些体会 函数式编程 《左耳听风》笔记 业务程序猿眼中的微服务管理 DDD实践——CQRS 项目隔离——案例研究 《编程的本质》笔记 系统故障排查汇总及教训 平台支持类系统的几个点 代码腾挪的艺术 abtest 系统设计汇总 《从0开始学架构》笔记 初级权限系统设计 领域驱动理念 现有上传协议分析 移动网络下的文件上传要注意的几个问题 推送系统的几个基本问题 做配置中心要想好的几个基本问题 不同层面的异步 分层那些事儿 性能问题分析 用户认证问题 资源的分配与回收——池 消息/任务队列

标签

k8s设备管理 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 kubevela源码分析 容器和CPU那些事儿 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 helm 从混部到统一调度 volcano特性源码分析 kubebuilder 学习 client-go学习 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 openkruise学习 基于Kubernetes选主及应用 Admission Controller 与 Admission Webhook k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? Kubernetes events学习及应用 CRI 资源调度泛谈 如何学习Kubernetes 以应用为中心 kubernetes operator kubernetes扩缩容 serverless 泛谈 什么是云原生 自定义CNI IPAM docker和k8s安全访问机制 Kubernetes监控 Kubernetes 控制器模型 Kubernetes资源调度——scheduler Kubernetes类型系统 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 Kubernetes源码分析——从kubectl开始 kubernetes yaml配置 CNI——容器网络是如何打通的 当我在说PaaS时,我在说什么 《深入剖析kubernetes》笔记 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件

LLM工作流编排

2024年05月16日

从顺序式为主的简单架构走向复杂的WorkFlow

编程语言大类上可以分为命令式编程和声明式编程,前者深入细节,各种 if else、各种 while/for,程序员掌控每个像素;后者把任务「描述」清楚,重点在业务流程翻译成所用的语言上,具体怎么实现甩给别人(大部分是系统自带)。由于这一波 LLMs 强大的理解、生成能力,关注细节的命令式编程似乎不再需要,而偏重流程或者说业务逻辑编排的 pipeline 能力的声明式编程,成了主流「编程」方式。

RAG 流程是指在 RAG 系统中,从输入查询到输出生成文本的整个工作流程。这个流程通常涉及多个模块和操作符的协同工作,包括但不限于检索器、生成器以及可能的预处理和后处理模块。RAG 流程的设计旨在使得 LLM(大语言模型)能够在生成文本时利用外部知识库或文档集,从而提高回答的准确性和相关性。推理阶段的RAG Flow分成四种主要的基础模式:顺序、条件、分支与循环。PS: 一个llm 业务有各种基本概念,prompt/llm/memory,整个工作流产出一个流式输出,处理链路上包含多个step,且step有复杂的关系(顺序、条件、分支与循环)。一个llm 业务开发的核心就是个性化各种原子能力 以及组合各种原子能力。

以一个RAG Agent 的工作流程为例

  1. 根据问题,路由器决定是从向量存储中检索上下文还是进行网页搜索。
  2. 如果路由器决定将问题定向到向量存储以进行检索,则从向量存储中检索匹配的文档;否则,使用 tavily-api 进行网页搜索。
  3. 文档评分器然后将文档评分为相关或不相关。
  4. 如果检索到的上下文被评为相关,则使用幻觉评分器检查是否存在幻觉。如果评分器决定响应缺乏幻觉,则将响应呈现给用户。
  5. 如果上下文被评为不相关,则进行网页搜索以检索内容。
  6. 检索后,文档评分器对从网页搜索生成的内容进行评分。如果发现相关,则使用 LLM 进行综合,然后呈现响应。

高级 RAG 检索策略之流程与模块化业界一个共识是RAG的演进:Naive RAG ==> Advanced RAG ==> Modular RAG。要落地Modular RAG,便是定义模块以及将模块串起来的Pipeline。比如LlamaIndex 的探索。PS: pipeline/add_modules/add_link

retriever =  index.as_retriever()
p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "input": InputComponent(),
        "retriever": retriever,
        "output": SimpleSummarize(),
    }
)
p.add_link("input", "retriever")
p.add_link("input", "output", dest_key="query_str")
p.add_link("retriever", "output", dest_key="nodes")

完整的流水线

evaluator = RagasComponent()
p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "input": InputComponent(),
        "query_rewriter": query_rewriter,
        "retriever": retriever,
        "meta_replacer": meta_replacer,
        "reranker": reranker,
        "output": TreeSummarize(),
        "evaluator": evaluator,
    }
)
p.add_link("input", "query_rewriter")
p.add_link("input", "query_rewriter", src_key="input")
p.add_link("query_rewriter", "retriever")
p.add_link("retriever", "meta_replacer")
p.add_link("input", "reranker", dest_key="query_str")
p.add_link("input", "reranker", src_key="input", dest_key="query_str")
p.add_link("meta_replacer", "reranker", dest_key="nodes")
p.add_link("input", "output", dest_key="query_str")
p.add_link("input", "output", src_key="input", dest_key="query_str")
p.add_link("reranker", "output", dest_key="nodes")
p.add_link("input", "evaluator", src_key="input", dest_key="question")
p.add_link("input", "evaluator", src_key="ground_truth", dest_key="ground_truth")
p.add_link("reranker", "evaluator", dest_key="nodes")
p.add_link("output", "evaluator", dest_key="answer")

LCEL

在 LangChain 里只要实现了Runnable接口,并且有invoke方法,都可以成为链。实现了Runnable接口的类,可以拿上一个链的输出作为自己的输入。

langchain入门3-LCEL核心源码速通LCEL实际上是langchain定义的一种DSL,可以方便的将一系列的节点按声明的顺序连接起来,实现固定流程的workflow编排。LCEL语法的核心思想是:一切皆为对象,一切皆为链。这意味着,LCEL语法中的每一个对象都实现了一个统一的接口:Runnable,它定义了一系列的调用方法(invoke, batch, stream, ainvoke, …)。这样,你可以用同样的方式调用不同类型的对象,无论它们是模型、函数、数据、配置、条件、逻辑等等。而且,你可以将多个对象链接起来,形成一个链式结构,这个结构本身也是一个对象,也可以被调用。这样,你可以将复杂的功能分解成简单的组件,然后用LCEL语法将它们组合起来,形成一个完整的应用。

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
import os
os.environ["OPENAI_API_BASE"] = "http://xx:8000/v1"
os.environ["OPENAI_API_KEY"] = "EMPTY"
prompt = ChatPromptTemplate.from_template("tell me a short joke about {topic}")
output_parser = StrOutputParser()
model = ChatOpenAI(model="moonshot-v1-8k")
chain = prompt | model | output_parser
answer = chain.invoke({"topic": "ice cream"})
print(answer)
# 调用笑话对象,传入一个主题字符串,得到一个笑话字符串的流
chain.stream("dog")

chain = prompt | model | output_parser。我们可以看到这段代码中使用了运算符|,熟悉python的同学肯定知道,这里使用了python的magic method,也就是说它一定具有__or__函数。prompt | model就相当于prompt.__or__(model)。实际上,prompt实现了__ror__,这个magic method支持从右往左的or运算,dict|prompt,相当于prompt.__ror__(dict)

Component Input Type Output Type
Prompt Dictionary PromptValue
ChatModel Single string, list of chat messages or a PromptValue ChatMessage
LLM Single string, list of chat messages or a PromptValue String
OutputParser The output of an LLM or ChatModel Depends on the parser
Retriever Single string List of Documents
Tool Single string or dictionary, depending on the tool Depends on the tool

模块化抽象Runnable

我们使用的所有LCEL相关的组件都继承自RunnableSerializable,RunnableSequence 顾名思义就按顺序执行的Runnable,分为两部分Runnable和Serializable。其中Serializable是继承自Pydantic的BaseModel。(py+pedantic=Pydantic,是非常流行的参数验证框架)Serializable提供了,将Runnable序列化的能力。而Runnable,则是LCEL组件最重要的一个抽象类,它有几个重要的抽象方法。

class Runnable(Generic[Input, Output], ABC):
    @abstractmethod
    def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:

Runnable所有接口都接收可选的配置参数,可用于配置执行、添加标签和元数据,以进行跟踪和调试。

  1. invoke/ainvoke: 单个输入转为输出。
  2. batch/abatch:批量转换。
  3. stream/astream: 单个流式处理。PS: 如果没有这个,只能通过callbackhandler.on_llm_new_token 获取llm的吐字了
     chunks = []
     async for chunk in model.astream("你好。告诉我一些关于你自己的事情"):
         chunks.append(chunk)
         print(chunk.content, end="|", flush=True)
    
  4. stream_events:从输入流流式获取结果与中间步骤。 PS: 有点替换 callbackhandler 的味道。

     async for event in model.astream_events("hello", version="v1"):
         kind = event["event"]
         if kind == "on_chat_model_stream":
             ...
         if kind == "on_parser_stream":
             ...
    

有时我们希望 使用常量参数调用Runnable 调用链中的Runnable对象,这些常量参数不是序列中前一个Runnable 对象输出的一部分,也不是用户输入的一部分,我们可以使用Runnable.bind 方法来传递这些参数。

同时Runnbale也实现了两个重要的magic method ,就是前面说的用于支持管道操作符 __or____ror__。Runnable之间编排以后,会生成一个RunnableSequence。
class Runnable(Generic[Input, Output], ABC):
    def __or__(
        self,
        other: Union[
            Runnable[Any, Other],
            Callable[[Any], Other],
            Callable[[Iterator[Any]], Iterator[Other]],
            Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],
        ],
    ) -> RunnableSerializable[Input, Other]:
        """Compose this runnable with another object to create a RunnableSequence."""
        return RunnableSequence(self, coerce_to_runnable(other))

    def __ror__(
        self,
        other: Union[
            Runnable[Other, Any],
            Callable[[Other], Any],
            Callable[[Iterator[Other]], Iterator[Any]],
            Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]],
        ],
    ) -> RunnableSerializable[Other, Output]:
        """Compose this runnable with another object to create a RunnableSequence."""
        return RunnableSequence(coerce_to_runnable(other), self)

Runnable 对象表示一个可调用的函数或操作单元,RunnableSequence 可以看成由lcel 构建的调用链的实际载体。如果我们运行最终编排好的Chain,例如chain.invoke({“topic”: “ice cream”}),实际上就是执行了RunnableSequence的invoke。那我们先来看看invoke函数。

# config对象,可以设置一些并发数、标签等等配置,默认情况下为空。
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:  
    from langchain_core.beta.runnables.context import config_with_context  
  
    # 根据上下文补充config
    config = config_with_context(ensure_config(config), self.steps)  
    # 创建回调管理器,用于支持运行中产生的各种回调
    callback_manager = get_callback_manager_for_config(config)  
    # 创建运行管理器,用于处理异常重试,结束等情况
    run_manager = callback_manager.on_chain_start(  
        dumpd(self), input, name=config.get("run_name") or self.get_name()  
    )  
	# !!关键内容!!
    # 调用整个链
    try:  
	    # 顺序执行step,每一步的输出,将作为下一步的输入
        for i, step in enumerate(self.steps):  
            input = step.invoke(  
                input,  
                # 为下一个step更新config 
                patch_config(  
                    config, callbacks=run_manager.get_child(f"seq:step:{i+1}")  
                ),  
            )  
    # finish the root run  
    except BaseException as e:  
        run_manager.on_chain_error(e)  
        raise  
    else:  
        run_manager.on_chain_end(input)  
        return cast(Output, input)

Runnable 还有很多增强/装饰方法,对underlying runnable 增加一些配置、逻辑得到一个新的Runnable,以openai为例,底层实质扩充了openai.ChatCompletion.create 前后逻辑或调用参数

class Runnable(Generic[Input, Output], ABC):
    def assign(self,**kwargs)-> RunnableSerializable[Any, Any]:
        return self | RunnableAssign(RunnableParallel(kwargs))
    # Bind kwargs to pass to the underlying runnable when running it.
    def bind(self, **kwargs: Any) -> Runnable[Input, Output]:
        return RunnableBinding(bound=self, kwargs=kwargs, config={})
    # Bind config to pass to the underlying runnable when running it.
    def with_config(self,config: Optional[RunnableConfig] = None,**kwargs: Any,) -> Runnable[Input, Output]:
        return RunnableBinding(...)
    # Bind a retry policy to the underlying runnable.
    def with_retry(self,retry_if_exception_type,...) -> Runnable[Input, Output]:
        return RunnableRetry(...)
    # Bind a fallback policy to the underlying runnable.
    def with_fallbacks(self,fallbacks,...)-> RunnableWithFallbacksT[Input, Output]:
        return RunnableWithFallbacks(self,fallbacks,...)

Runnable串联

def add_one(x: int) -> int:
    return x + 1
def mul_two(x: int) -> int:
    return x * 2
runnable_1 = RunnableLambda(add_one) # RunnableLambda 可以把一个Callable类转成Runnable类(python所有可调用对象都是Callable 类型),从而可以将你自定义的函数集成到chain中
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1 | runnable_2
sequence.invoke(1)

def mul_three(x: int) -> int:
    return x * 3
sequence = runnable_1 | {  # Runnable对象的列表或字典/this dict is coerced to a RunnableParallel
    "mul_two": runnable_2,
    "mul_three": runnable_3,
}
sequence.invoke(1) # 会输出一个dict {'mul_two':4, 'mul_three':6}

branch = RunnableBranch(
    (lambda x: isinstance(x, str), lambda x: x.upper()),
    (lambda x: isinstance(x, int), lambda x: x + 1),
    (lambda x: isinstance(x, float), lambda x: x * 2),
    lambda x: "goodbye",
)
branch.invoke("hello") # "HELLO"
branch.invoke(None) # "goodbye"

RunnableParallel 的使用可以有以下三种形式,三种形式等价:

{"context": retriever, "question": RunnablePassthrough()}
RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
RunnableParallel(context=retriever, question=RunnablePassthrough())

在使用LCEL构建链时,原始用户输入可能不仅要传给第一个组件,还要传给后续组件,这时可以用RunnablePassthrough。RunnablePassthrough可以透传用户输入。

# 用户输入的问题,不止组件1的检索器要用,组件2也要用它来构建提示词,因此组件1使用RunnablePassthrough方法把原始输入透传给下一步。
chain = (
    # 由于组件2 prompt的输入要求是字典类型,所以组件1把检索器和用户问题写成字典格式,并用组件2的变量作为键。
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

两个Runnable 对象之间(很多时候是第一个)需要一些数据处理、转换、透传的逻辑

  1. 在构建复杂的RunnableSequence时,我们可能需要将很多信息从上游传递到下游,此时可以用到RunnablePassthrough。此外,还可以使用RunnablePassthrough.assign 方法在透传上游数据的同时添加一些新的的数据。
  2. RunnableMap,底层是RunnableParallel,通常以一个dict 结构出现,value 是一个Runnable对象,lcel 会并行的调用 value部分的Runnable对象 并将其返回值填充dict,之后将填充后的dict 传递给RunnableSequence 的下一个Runnable对象。

目前Memory模块还是Beta版本,创建带Memory功能的Chain,并不能使用统一的LCEL语法。但是,LangChain提供了工具类RunnableWithMessageHistory,支持了为Chain追加History的能力,从某种程度上缓解了上述问题。不过需要指定Lambda函数get_session_history以区分不同的会话,并需要在调用时通过config参数指定具体的会话ID。

llm = xx
prompt =  xx
chain = prompt | llm | output_parser
history = ChatMessageHistory()
chain_with_history = RunnableWithMessageHistory(
    chain,
    lambda session_id: history,
    input_messages_key="question",
    history_messages_key="chat_history",
)

LCEL提供了多种优势,例如一流的流支持、异步支持、优化的并行执行、支持重试和回退、访问中间结果、输入和输出模式以及无缝 LangSmith 跟踪集成。但因为语法上的问题,要实现 loop 和 condition 的情况就比较困难。于是LangChain社区推出了一个新的项目——LangGraph,期望基于LangChain构建支持循环和跨多链的计算图结构,以描述更复杂的,甚至具备自动化属性的AI工程应用逻辑,比如智能体应用。

LangGraph

彻底搞懂LangGraph:构建强大的Multi-Agent多智能体应用的LangChain新利器 相对于Chain.invoke()直接运行,Agent_executor的作用就是为了能够实现多次循环ReAct的动作,以最终完成任务。为什么需要将循环引入运行时呢?考虑一个增强的RAG应用:我们可以对语义检索出来的关联文档(上下文)进行评估:如果评估的文档质量很差,可以对检索的问题进行重写(Rewrite,比如把输入的问题结合对话历史用更精确的方式来表达),并把重写结果重新交给检索器,检索出新的关联文档,这样有助于获得更精确的结果。这里把Rewrite的问题重新交给检索器,就是一个典型的“循环”动作。而在目前LangChain的简单链中是无法支持的。其他一些典型的依赖“循环”的场景包括:代码生成时的自我纠正:当借助LLM自动生成软件代码时,根据代码执行的结果进行自我反省,并要求LLM重新生成代码;Web访问自动导航:每当进入下一界面时,需要借助多模态模型来决定下一步的动作(点击、滚动、输入等),直至完成导航。

那么,如果我们需要在循环中调用LLM能力,就需要借助于AgentExecutor,将Agent 置于一个循环执行环境(PS:所谓自主运行就是靠循环)。其调用的过程主要就是两个步骤:

  1. 通过大模型来决定采取什么行动,使用什么工具,或者向用户输出响应(如运行结束时);
  2. 执行1步骤中的行动,比如调用某个工具,并把结果继续交给大模型来决定,即返回步骤1; 这里的AgentExecute存在的问题是:过于黑盒,所有的决策过程隐藏在AgentExecutor背后,缺乏更精细的控制能力,在构建复杂Agent的时候受限。这些精细化的控制要求比如:
  3. 某个Agent要求首先强制调用某个Tool
  4. 在 Agent运行过程中增加人机交互步骤
  5. 能够灵活更换Prompt或者背后的LLM
  6. 多Agent(Multi-Agent)智能体构建的需求,即多个Agent协作完成任务的场景支持。 所以,让我们简单总结LangGraph诞生的动力:LangChain简单的链(Chain)不具备“循环”能力;而AgentExecutor调度的Agent运行又过于“黑盒”。因此需要一个具备更精细控制能力的框架来支持更复杂场景的LLM应用。

LangGraph的实现方式是把之前基于AgentExecutor的黑盒调用过程用一种新的形式来构建:状态图(StateGraph)。把基于LLM的任务(比如RAG、代码生成等)细节用Graph进行精确的定义(定义图的节点与边),最后基于这个图来编译生成应用;在任务运行过程中,维持一个中央状态对象(state),会根据节点的跳转不断更新,状态包含的属性可自行定义。

构建Agent

lcel示例

tools: Sequence[BaseTool] = xx
# A Runnable sequence representing an agent. It takes as input all the same input variables as the prompt passed in does. It returns as output either an AgentAction or AgentFinish.
agent = (
    RunnablePassthrough.assign(
        agent_scratchpad=lambda x: format_log_to_str(x["intermediate_steps"]),
    )
    | prompt
    | llm_with_stop
    | JSONAgentOutputParser()
)
executor = AgentExecutor(agent=agent, tools=tools, callbacks=callbacks)

AgentExecutor初始化的时候,如果发现agent 是一个Runable,则会将其转为RunnableAgent。Agent输入输出比较明确

  1. 输入 prompt hwchase17/structured-chat-agent 和 intermediate_steps
  2. 输出 AgentAction 和 AgentFinish
    class RunnableAgent(BaseSingleActionAgent): 
     def plan(
         self,
         intermediate_steps: List[Tuple[AgentAction, str]],
         callbacks: Callbacks = None,
         **kwargs: Any,
     ) -> Union[AgentAction, AgentFinish]:
         ...
    

    AgentExecutor 循环执行Agent.plan,Agent.plan返回AgentAction,之后AgentExecutor 执行AgentAction.tool 得到observation, AgentExecutor 会将action和observation包装为 AgentStep 塞到临时变量intermediate_steps里,并在下次执行Agent.plan时塞给Agent.plan。AgentExecutor 驱动 Agent.plan 和 tool.run 一般相对固定,个性化逻辑主要是 Agent,主要体现在个性化 prompt 及tool description。

langgraph示例

langgraph正在成为构建Agent的推荐方式。

一个最基础的ReAct范式的Agent应用对应的Graph如下:

简单的实现代码如下(省略了部分细节):

# 定义一个Graph,传入state定义(参考上图state属性)
workflow = StateGraph(AgentState)

# 两个节点
#节点1: 推理节点,调用LLM决定action,省略了runreason细节
workflow.add_node("reason", run_reason)

#节点2: 行动节点,调用tools执行action,省略executetools细节
workflow.add_node("action", execute_tools)
#入口节点:总是从推理节点开始
workflow.set_entry_point("reason")
#条件边:根据推理节点的结果决定下一步
workflow.add_conditional_edges(
    "reason",
    should_continue, #条件判断函数(自定义,根据状态中的推理结果判断)
    {
        "continue": "action", #如果条件函数返回continue,进action节点
        "end": END, #如果条件函数返回end,进END节点
    },
)
#普通边:action结束后,总是返回reason
workflow.add_edge("action", "reason")
#编译成app
app = workflow.compile()
#可以调用app了,并使用流式输出
inputs = {"input": "you task description", "chat_history": []}
for s in app.stream(inputs):
    print(list(s.values())[0])
    print("----")

LangGraph原理

LangGraph 三个核心要素

  1. StateGraph,LangGraph 在图的基础上增添了全局状态变量,是一组键值对的组合,可以被整个图中的各个节点访问与更新,从而实现有效的跨节点共享及透明的状态维护。它将该对象传递给每个节点。然后,节点会以键值对的形式,返回对状态属性的操作。这些操作可以是在状态上设置特定属性(例如,覆盖现有值)或者添加到现有属性。
  2. 在创建了StateGraph之后,我们需要向其中添加Nodes(节点)。添加节点是通过graph.add_node(name, value)语法来完成的。其中,name参数是一个字符串,用于在添加边时引用这个节点。value参数应该可以是函数或runnable 接口,它们将在节点被调用时执行。其输入应为状态图的全局状态变量,在执行完毕之后也会输出一组键值对,字典中的键是State对象中要更新的属性。说白了,Nodes(节点)的责任是“执行”,在执行完毕之后会更新StateGraph的状态。
  3. 节点通过边相互连接,形成了一个有向无环图(DAG),边有几种类型:
    1. Normal Edges:即确定的状态转移,这些边表示一个节点总是要在另一个节点之后被调用。
    2. Conditional Edges:输入是一个节点,输出是一个mapping,连接到所有可能的输出节点,同时附带一个判断函数(输入是StateGraph,输出是Literal),根据全局状态变量的当前值判断流转到哪一个输出节点上,以充分发挥大语言模型的思考能力。

当我们使用这三个核心要素构建图之后,通过图对象的compile方法可以将图转换为一个 Runnable对象(Runnable也有Runnable.get_graph 转为Graph对象),之后就能使用与lcel完全相同的接口调用图对象。

class Graph:
    def __init__(self) -> None:
        self.nodes: dict[str, Runnable] = {}
        self.edges = set[tuple[str, str]]()
        self.branches: defaultdict[str, dict[str, Branch]] = defaultdict(dict)
        self.support_multiple_edges = False
        self.compiled = False

langgraph 代码的主要流程 构建node、edge,然后将其组为graph,自然 langchain 会提供很多现成封装,将各种组件封装为 node/edge。比如两个 为tool 提供了 ToolNode(将tool转为 node,因为node 一般入参是stateGraph,出餐是dict), tools_condition(是一个入参包含stateGraph 的函数,返回Literal)

web_search_tool = TavilySearchResults(k=3)
tools = [web_search_tool]
retrieve = ToolNode(tools)
...
workflow.add_conditional_edges(
    "agent",
    # Assess agent decision
    tools_condition,
    {
        # Translate the condition outputs to nodes in our graph
        "tools": "retrieve",
        END: END,
    },
)
workflow.add_node("retrieve", retrieve) 
from langgraph_core.tools import BaseTool
class BaseTool(RunnableSerializable[Union[str, Dict], Any]):
    name: str
    description: str
    def invoke(self, input: Union[str, Dict],config: Optional[RunnableConfig] = None,**kwargs: Any,) -> Any:
        ...
        return self.run(...)
class Tool(BaseTool):
    description: str = ""
    func: Optional[Callable[..., str]]
    coroutine: Optional[Callable[..., Awaitable[str]]] = None

from langgraph.prebuilt import ToolNode
class ToolNode(RunnableCallable):
    def __init__( self,tools: Sequence[BaseTool],*,name: str = "tools",tags: Optional[list[str]] = None,) -> None:
        super().__init__(self._func, self._afunc, name=name, tags=tags, trace=False)
        self.tools_by_name = {tool.name: tool for tool in tools}
    def _func(self, input: Union[list[AnyMessage], dict[str, Any]], config: RunnableConfig) -> Any:
        message = messages[-1]
        def run_one(call: ToolCall):
            output = self.tools_by_name[call["name"]].invoke(call["args"], config)
            return ToolMessage(...output...)
        with get_executor_for_config(config) as executor:
            outputs = [*executor.map(run_one, message.tool_calls)]
            return outputs 或者 {"messages": outputs}

def tools_condition(state: Union[list[AnyMessage], dict[str, Any]],) -> Literal["tools", "__end__"]:
    if isinstance(state, list):
        ai_message = state[-1]
    elif messages := state.get("messages", []):
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
        return "tools"
    return "__end__"

其它

从llamaindex的代码看,问答链路多种多样(比如RouterQueryEngine/MultiStepQueryEngine等),一种链路是一种queryEngine,每种queryEngine有不同的组件,比如RouterQueryEngine提出了 BaseSelector(可以EmbeddingSingleSelector 也可以是LLMSingleSelector) summarizer(BaseSynthesizer子类) 抽象,有分有合。langchain类似,会提各种xxChain,创建chain的时候也会指定一些抽象,比如xxComprossor(实质就是rerank)但不如llamaindex 明显。

class RouterQueryEngine(BaseQueryEngine):
    def __init__( self,
        selector: BaseSelector,
        query_engine_tools: Sequence[QueryEngineTool],
        llm: Optional[LLM] = None,
        summarizer: Optional[TreeSummarize] = None,
    ) -> None:
        ...

也就是,问答链路复杂了,有多个step, step 如何串成piipeline, spep 之间如何传递数据,此时有几种选择

  1. 每个组件都遵守比如Runnable 接口(输入输出是dict 或很宽泛的Input/Oupput,也是一种抽象,但这种抽象几乎意义不大),然后串起来。 此时要解决 控制流(分支、循环)以及组件间的参数传递问题。
  2. 每种链路提出一个抽象
  3. 控制流:一个抽象固化了问答的先后顺序,比如 RouterQueryEngine/MultiStepQueryEngine。langchain的各种xxChain。
  4. 组件抽象:具体到链路的每个step也逐步提出一些抽象,比如MultiStepQueryEngine.selector 和 MultiStepQueryEngine.summarizer,selector/summarizer实质都是llm 调用,主要是用的prompt以及附带的入参不同。稍微复杂一点的复用,可以抽象为tool。比如对于MultiStepQueryEngine 链路就是:selector ==> tool ==> selector == tool ==>summarizer。业务开发时,自己手撸方法 子方法,相当于是RouterQueryEngine及BaseSelector 的丐版。
  5. 参数传递:参数的传递可以通过QueryEngine/XXChain 的成员变量
  6. langgraph 是一种中间态,复用靠把箭头指向原有链路,一条子链路存在的本身就是一种抽象,信息传递靠GraphState。

框架的作用,很多时候也不是没它不行,框架是一个约束和规范,没有它经常会跑偏。