前言(未完成)
向量模型是指通过特定的嵌入(embedding)技术,将原始的非结构化数据转换成一个数值型的向量,在数学表示上,向量是一个由浮点数或者二值型数据组成的 n 维数组。
面向 RAG 应用开发者的实用指南和建议在 RAG 应用生产环境中有效部署向量数据库的关键技巧:
- 设计一个有效的 Schema:仔细考虑数据结构及其查询方式,创建一个可优化性能和提供可扩展性的 Schema。
- 动态 Schema vs. 固定 Schema。。动态 Schema 提供了灵活性,简化了数据插入和检索流程,无需进行大量的数据对齐或 ETL 过程。这种方法非常适合需要更改数据结构的应用。另一方面,固定 Schema 也十分重要,因为它们有着紧凑的存储格式,在性能效率和节约内存方面表现出色。
- 设置主键和 Partition key
- 选择 Embedding 向量类型。稠密向量 (Dense Embedding);稀疏向量(Sparse Embedding);二进制向量(Binary Embedding)
- 考虑可扩展性:考虑未来的数据规模增长,并充分设计架构以适应不断增长的数据量和用户流量。
- 选择最佳索引并优化性能:可以针对向量数据构建高效的索引结构,如倒排索引、树形结构(如 KD 树、Ball Tree)或近似最近邻搜索算法(如FAISS、HNSW),加速检索过程。
milvus
向量数据库
当我们把通过模型或者 AI 应用处理好的数据喂给它之后(“一堆特征向量”),它会根据一些固定的套路,例如像传统数据库进行查询优化加速那样,为这些数据建立索引。避免我们进行数据查询的时候,需要笨拙的在海量数据中进行。
本地
faiss 原生使用
# 准备数据
model = SentenceTransformer('uer/sbert-base-chinese-nli')
sentences = ["住在四号普里怀特街的杜斯利先生及夫人非常骄傲地宣称自己是十分正常的人",
"杜斯利先生是一家叫作格朗宁斯的钻机工厂的老板", "哈利看着她茫然地低下头摸了摸额头上闪电形的伤疤",
"十九年来哈利的伤疤再也没有疼过"]
sentence_embeddings = model.encode(sentences)
# 建立索引
dimension = sentence_embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(sentence_embeddings)
# 检索
topK = 2
search = model.encode(["哈利波特猛然睡醒"]) # 将要搜索的内容“哈利波特猛然睡醒”编码为向量
D, I = index.search(search, topK) # D指的是“数据置信度/可信度” I 指的是我们之前数据准备时灌入的文本数据的具体行数。
print(I)
print([x for x in sentences if sentences.index(x) in I[0]])
faiss 与LangChain 集合,主要是与 LangChain 的 document和 Embeddings 结合。 faiss 本身只存储 文本向量化后的向量(index.faiss文件),但是vector db对外使用,一定是文本查文本,所以要记录 文本块与向量关系(index.pkl文件)。此外,需支持新增和删除文件(包含多个文本块),所以也要支持按文件删除 文本块对应的向量。
from langchain.document_loaders import TextLoader
# 录入documents 到faiss
loader = TextLoader("xx.txt") # 加载文件夹中的所有txt类型的文件
documents = loader.load() # 将数据转成 document 对象,每个文件会作为一个 document
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents) # 切割加载的 document
embeddings = OpenAIEmbeddings() # 初始化 openai 的 embeddings 对象
db = FAISS.from_documents(docs, embeddings) # 将 document 通过 openai 的 embeddings 对象计算 embedding 向量信息并临时存入 faiss 向量数据库,用于后续匹配查询
query = "What did the president say about Ketanji Brown Jackson"
docs = db.similarity_search(query)
print(docs[0].page_content)
简单的源码分析
# 根据文档内容构建 langchain.vectorstores.Faiss
vectorstore.base.from_documents(cls: Type[VST],documents: List[Document], embedding: Embeddings, **kwargs: Any,) -> VST:
"""Return VectorStore initialized from documents and embeddings."""
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
# Embeds documents.
embeddings = embedding.embed_documents(texts)
cls.__from(texts,embeddings,embedding, metadatas=metadatas,ids=ids,**kwargs,)
# Initializes the FAISS database
faiss = dependable_faiss_import()
index = faiss.IndexFlatL2(len(embeddings[0]))
vector = np.array(embeddings, dtype=np.float32)
index.add(vector)
# 建立id 与text 的关联
documents = []
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
documents.append(Document(page_content=text, metadata=metadata))
index_to_id = dict(enumerate(ids))
# Creates an in memory docstore
docstore = InMemoryDocstore(dict(zip(index_to_id.values(), documents)))
return cls(embedding.embed_query,index,docstore,index_to_id,normalize_L2=normalize_L2,**kwargs,)
save_local:
faiss = dependable_faiss_import()
faiss.write_index(self.index, str(path / "{index_name}.faiss".format(index_name=index_name)))
with open(path / "{index_name}.pkl".format(index_name=index_name), "wb") as f:
pickle.dump((self.docstore, self.index_to_docstore_id), f)
在线
Pinecone 是一个在线的向量数据库。所以,我可以第一步依旧是注册,然后拿到对应的 api key。
from langchain.vectorstores import Pinecone
# 从远程服务加载数据
docsearch = Pinecone.from_existing_index(index_name, embeddings)
# 录入documents 持久化数据到pinecone
# 初始化 pinecone
pinecone.init(api_key="你的api key",environment="你的Environment")
loader = DirectoryLoader('/content/sample_data/data/', glob='**/*.txt')
documents = loader.load() # 将数据转成 document 对象,每个文件会作为一个 document
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
split_docs = text_splitter.split_documents(documents) # 切割加载的 document
docsearch = Pinecone.from_texts([t.page_content for t in split_docs], embeddings, index_name=index_name) # 持久化数据到pinecone