Estuary

Build a Data Pipeline for AI: Use ChatGPT on Your Own Data

Wondering how to use ChatGPT on your own data - which wasn't in the training datasets? Here's how to use a real-time data pipeline, with an example.

Build a Data Pipeline for AI: Use ChatGPT on Your Own Data
Share this article

Large Language Models (LLMs) like ChatGPT are great for answering questions that are covered by the datasets they have been trained on, but what if you want to use them for more recent data, or data that is very specific to your business that wasn't part of their training dataset? Asking these kinds of questions will inevitably lead to incomplete or inaccurate responses.

You can use a technique called Retrieval Augmented Generation to solve this problem. This involves passing additional related documents to the LLM along with the question itself. The LLM then uses that related information along with its trained "understanding of the world" to produce a response. 

To use this technique, you’ll first need to get your data into a vector database like Pinecone. For the uninitiated, vector databases allow you to store and retrieve related documents based on their vector embeddings — a data representation that allows ML models to understand semantic similarity. 

In this article, we’ll move data into Pinecone with a real-time data pipeline, and use retrieval augmented generation to teach ChatGPT about a novel topic. We’ll do this with Estuary Flow's Pinecone Materialization Connector.

We'll build our pipeline step-by-step, using an example scenario to illustrate the problem and how to solve it.

Teaching ChatGPT Something New

We’re gonna get a little meta here: Estuary Flow is the tool we’re using to build our AI data pipeline, and it’s also the topic we’ll be teaching ChatGPT about. Since Estuary has only been around for a few years, its documentation wasn’t included in ChatGPT’s training data. 

And depending on where you found this article, you might not be familiar with Flow, either. 

Flow is a managed platform for moving data between systems you care about, in real time. But to really master Flow, you need to understand some more nuanced concepts. One of these is derivations — how you transform data-in-motion (and something that we'll be using a little later in this article!).

Let’s suppose you want to ask ChatGPT questions to learn more about derivations. You ask a simple question: "What modes does Estuary Flow support for shuffling documents as part of a derivation?"

Depending on ChatGPT's mood, you might get an answer like this:

I apologize, but I couldn't find any information or references to "Estuary Flow" in my training data up until September 2021. It's possible that Estuary Flow is a more recent development or a specialized tool that hasn't been widely discussed or documented yet.

This answer is at least honest, but not very helpful. We've immediately hit the limits of recency for the training dataset. We need to enhance its knowledge base with a knowledge base of our own. The Flow docs for derivations certainly contain the answer to our question, so how do we teach ChatGPT about derivations?

We can ingest, or capture, the documentation using Flow, transform it into the correct format, and materialize it to Pinecone. Then, we’ll use Retrieval Augmented Generation, and see what ChatGPT has to say

Flow supports many capture connectors, which can pull data in real time from a variety of sources like databases and SaaS systems. For our example, we'll use the Notion capture connector and pull data from a page in Notion that has been populated with content from Flow's documentation about derivations.

Capturing the Data From Notion

Setting up a Notion capture is straightforward:

Once the capture is up and running, Flow starts to pull data from Notion into a collection of JSON documents. They look like this:

plaintext
{  "_meta": {    "uuid": "9a5d554d-ff0c-11ed-9001-6904a26633d6"  },  "archived": false,  "created_by": {    "id": "69ba7661-d95d-4667-8d59-e422266e33cb",    "object": "user"  },  "created_time": "2023-05-30T17:02:00.000Z",  "has_children": false,  "id": "97954ae4-1b92-4981-80b3-5ae8251644d4",  "last_edited_by": {    "id": "69ba7661-d95d-4667-8d59-e422266e33cb",    "object": "user"  },  "last_edited_time": "2023-05-30T17:02:00.000Z",  "object": "block",  "paragraph": {    "color": "default",    "rich_text": [      {        "annotations": {          "bold": false,          "code": false,          "color": "default",          "italic": false,          "strikethrough": false,          "underline": false        },        "href": null,        "plain_text": "Flow offers three modes for configuring document shuffles: key, any, and lambda.",        "text": {          "content": "Flow offers three modes for configuring document shuffles: key, any, and lambda.",          "link": null        },        "type": "text"      }    ]  },  "parent": {    "page_id": "142fdd4c-6f19-43d9-972a-d89fdc295342",    "type": "page_id"  },  "type": "paragraph" }

This document has quite a bit of information. Collections in Flow can be used multiple times for different purposes once captured. Some of the extra information might be useful in the future for other work, but right now it's obvious that the "paragraph" field of the document contains very relevant information to the question at hand. 

But we're not quite at the point where we can send the data along to ChatGPT in any practical way yet.

Transforming the Data

To transform the data into something we can use in our pipeline, we're going to use a derivation - the very thing we are teaching ChatGPT about! Derivations can do all sorts of interesting things with a source collection of data using either SQLite or Typescript to produce a new collection of transformed documents. For our current task a simple SQLite transformation will do the trick (for steps, see the guide Streaming SQL Full Guide).

We want to get the source data document into a concise, flat structure that is compatible with Pinecone's metadata system. Flow's Pinecone materialization expects the document to include a field called "input" which will contain the relevant text we want to embed and use as context. Any additional fields in the document will be included in Pinecone as metadata fields.

This data mapping can be accomplished with a single SQL statement:

plaintext
SELECT json_object(    'id', $id,    'input', $paragraph$rich_text->0->'plain_text',    'bold', $paragraph$rich_text->0->'annotations'->'bold',    'color', $paragraph$rich_text->0->'annotations'->'color',    'strikethrough', $paragraph$rich_text->0->'annotations'->'strikethrough' ) WHERE    $archived = false AND    $type = 'paragraph' AND    json_array_length($paragraph$rich_text) > 0;

We're making extensive use of SQLite's JSON functions and operators to map data from the source collection into the derived collection. The most important fields are 'id' which provides a unique ID for every document, and 'input' which is the actual content we're after. A few other possibly interesting fields are being extracted to be included as metadata. Also note that we are filtering on documents (blocks) that have not been archived in Notion, and only including paragraph type blocks that actually contain text. This is handled by the conditions in the WHERE clause.

This derivation produces a nice set of output documents that can be previewed using Flow’s CLI, flowctl. The transformed documents look like this, and are in a form that is ready to materialize to Pinecone:

plaintext
{  "_meta": {    "uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"  },  "bold": false,  "code": false,  "color": "default",  "id": "97954ae4-1b92-4981-80b3-5ae8251644d4",  "input": "Flow offers three modes for configuring document shuffles: key, any, and lambda.",  "strikethrough": false }

Materializing the Data to Pinecone

Now we can set up the materialization, which will continuously add the properly formatted data from our derived collection into Pinecone. 

If you’re following along, the process is quite similar to creating a capture. When you create a Materialization, use the  Pinecone connector).

The Pinecone materialization integrates with OpenAI to create vector embeddings based on a text input using the text-embedding-ada-002 model. To set the materialization up, we'll need:

  • Pinecone account with an API Key for authentication. The free tier works great for running this example materialization.
  • An OpenAI account with an API Key for authentication. The OpenAI API is a paid service, but a free trial is available.
  • Pinecone Index created to store materialized vector embeddings. Since we're using text-embedding-ada-002, the index must have Dimensions set to 1536.

For this example, we’ll select the collection we just derived to materialize to Pinecone. But if we had multiple collections — representing multiple pages in Notion — we could select them all. Each of these collections would be added to a separate namespace in the index. You can configure the namespace the connector uses for each collection and it will be created automatically if it doesn't already exist in Pinecone. The namespace name defaults to the collection name.

After the materialization is published vectors will start to be added immediately to our Pinecone index.

The data pipeline is now complete: We are continuously capturing data from Notion into a collection, continuously transforming data from that collection into a new derived collection suitable for materializing to Pinecone, and materializing that data to Pinecone. As new data is added to the source it will automatically be captured, transformed, and materialized to Pinecone as a vector embedding.

Using Pinecone with ChatGPT

The heavy lifting of setting up the data pipeline is done and you can now use Pinecone to do whatever vector database-y things you wish, and there are lots of possibilities. Since we wanted to use it with ChatGPT for retrieval augmented generation, let's see what that might look like.

You can also get these scripts from GitHub.

Querying Pinecone Directly

As a sanity check, we can query Pinecone directly to see the most relevant documents it contains for the question we are trying to answer. This Python script will do exactly that. You'll need openai and pinecone-client installed. This exact query won't end up being necessary for our retrieval augmented generation system, but is a good illustration of what is happening behind the scenes.

python
import os import openai import pinecone OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or "OPENAI_API_KEY" PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") or "PINECONE_API_KEY" PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENVIRONMENT") or "PINECONE_ENVIRONMENT" model_name = "text-embedding-ada-002" index_name = "flow-index"  # Replace with your index name namespace = "flow-docs"  # Replace with your namespace pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENVIRONMENT) pinecone.whoami() index = pinecone.Index(index_name) openai.Engine.list() # Enter your query here. query = "What modes does Estuary Flow support for shuffling documents as part of a derivation?" query_embedding = openai.Embedding.create(input=[query], engine=model_name) embedding_value = query_embedding["data"][0]["embedding"] res = index.query(embedding_value, top_k=3, include_metadata=True, namespace=namespace) for match in res["matches"]:    print(match["metadata"]["input"])    print("----")

Running the script will output the top 3 most relevant documents and the top match is this:

Flow offers three modes for configuring document shuffles: key, any, and lambda.

To say this is a relevant piece of information would be an understatement! ChatGPT should have no problem answering our question with this bit of information provided with the context.

Langchain Retrieval Question/Answering

For our retrieval augmented question/answering we'll use Langchain. The script for this isn't much more complicated than the simple sanity check we implemented previously, thanks to Langchain's built-in support for Pinecone and OpenAI:

python
import os import pinecone from langchain.vectorstores import Pinecone from langchain.embeddings.openai import OpenAIEmbeddings from langchain.chat_models import ChatOpenAI from langchain.chains import RetrievalQA OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or "OPENAI_API_KEY" PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") or "PINECONE_API_KEY" PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENVIRONMENT") or "PINECONE_ENVIRONMENT" model_name = "text-embedding-ada-002" text_field = "input" index_name = "flow-index"  # Replace with your index name namespace = "flow-docs"  # Replace with your namespace name pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENVIRONMENT) embed = OpenAIEmbeddings(model=model_name, openai_api_key=OPENAI_API_KEY) vectorstore = Pinecone(    pinecone.Index(index_name), embed.embed_query, text_field, namespace ) llm = ChatOpenAI(    openai_api_key=OPENAI_API_KEY, model_name="gpt-3.5-turbo", temperature=0.0 ) qa = RetrievalQA.from_chain_type(llm=llm, retriever=vectorstore.as_retriever()) # Enter your query here. query = "What modes does Flow support for shuffling documents as part of a derivation?" answer = qa.run(query) print("Question: ", query) print("----") print("Answer: ", answer)

And now we have our answer from ChatGPT:

Question: What modes does Flow support for shuffling documents as part of a derivation?

----

Answer: Flow offers three modes for configuring document shuffles: key, any, and lambda.

You might be thinking this is kind of silly since the answer is exactly the same as the most relevant document retrieved from the vector database, but this isn't always the case. We can ask a more open-ended question:

Question: How does document shuffling work for a Flow derivation?

----

Answer: In a Flow derivation, read source documents are first shuffled on a shuffle key to co-locate the processing of documents that have equal shuffle keys. This means that the shard that should process a particular source document is identified in order to co-locate that processing with other documents it may need to know about. The transformation then processes documents by invoking lambdas: user-defined functions that accept documents as arguments, return documents in response, and potentially update internal task state.

This answer is actually produced from several bits of related information from the vector database context, with the top 3 most relevant pieces of context being:

Shuffles let Flow identify the shard that should process a particular source document, in order to co-locate that processing with other documents it may need to know about.

----

Roughly speaking, the derivation will globally process transformations and their source documents in the time-based order in which the source documents were originally written to their source collections. This means that a derivation started a month ago and a new copy of the derivation started today, will process documents in the same order and arrive at the same result. Derivations are repeatable.

----

Read source documents are first shuffled on a shuffle key to co-locate the processing of documents that have equal shuffle keys. The transformation then processes documents by invoking lambdas: user-defined functions that accept documents as arguments, return documents in response, and potentially update internal task state.

It's Really Real-Time

We've been calling this a real-time pipeline all along, and now we can finally see that in action.

The system knows a little bit about Estuary Flow, but not everything. We haven't told it anything about any topics other than derivations. If we ask it a question that is related to derivations but a little outside of the knowledge base we have loaded so far, we get a decent answer but not great:

Question: How can I verify Flow derivations are working correctly?

----

Answer: Derivations re-validate their source documents against the source collection's schema as they are read. This means that if there is an error in the schema, the derivation will pause and give you an opportunity to correct the problem. Additionally, all collection documents contain a wall-clock timestamp of when they were published, and the transforms of a derivation will generally process source documents in ascending wall-time order. You can also refine the relative order in which source documents are read by using a read delay, which is useful for implementing arbitrary window policies.

Let's try this again, after adding material from the docs about tests to our Notion page:

Question: How can I verify Flow derivations are working correctly?

----

Answer: You can use Flow tests to verify the end-to-end behavior of any modified schemas or derivations included in your Data Flow. You can feed example documents into a collection, and then verify that documents coming out of a derived collection meet your test's expectation. The Flow web application automatically performs basic tests to validate the configurations of captures and materializations. As your Data Flows grow in breadth and scope, and as requirements change or new contributors get involved, more robust tests are invaluable for ensuring the correctness of your data products. You can learn more about testing large or complex derivations in the derivation pattern examples of Flow's repository.

This is a much more complete and accurate answer.

As our source knowledge base grows, so does the ability of ChatGPT to answer questions about Flow. With the data pipeline we have set up, this all happens automatically and in real-time, fully managed and capable of operating at massive scale. 

LLMs like ChatGPT are impressive, but they aren't magic: The output they produce is only as good as the input information they have available. Using Flow with Pinecone ensures that input is available in a high-quality form as fast as possible.

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Build a Pipeline

Start streaming your data for free

Build a Pipeline

Author

Picture of William Baker
William BakerEngineer

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.