Check Data Ingestion Status

As an alternative to polling, you can use webhooks to get notified when episodes finish processing.

Data added to Zep is processed asynchronously and can take a few seconds to a few minutes to finish processing. This recipe shows how to check whether data upload operations are finished processing.

Zep provides two methods for checking data ingestion status:

  • Task polling: Use client.task.get() to check the status of clone operations and fact triple additions
  • Episode polling: Use graph.episode.get() to check individual episode processing status

For tracking large historical ingestions, see the Batch API, which has its own progress reporting via batch.get and per-item status via batch.list_items.

Checking Operation Status with Task Polling

When using operations that return a task_id, you can poll for completion status using client.task.get(). The following operations return a task_id:

  • graph.clone() - Graph cloning operations
  • graph.add_fact_triple() - Custom fact/node triplet additions

The pattern is the same in both cases: capture the task_id returned by the operation, then poll client.task.get(task_id=task_id) until status is succeeded or failed.

Checking Individual Episode Status with Episode Polling

For single episode operations or when you need to check the status of individual episodes, you can use the graph.episode.get() method. This approach is useful when adding data one episode at a time.

First, let’s create a user:

1import os
2import uuid
3import time
4from dotenv import find_dotenv, load_dotenv
5from zep_cloud.client import Zep
6
7load_dotenv(dotenv_path=find_dotenv())
8
9client = Zep(api_key=os.environ.get("ZEP_API_KEY"))
10uuid_value = uuid.uuid4().hex[:4]
11user_id = "-" + uuid_value
12client.user.add(
13 user_id=user_id,
14 first_name = "John",
15 last_name = "Doe",
16 email="[email protected]"
17)

Now, let’s add some data and immediately try to search for that data; because data added to Zep is processed asynchronously and can take a few seconds to a few minutes to finish processing, our search results do not have the data we just added:

1episode = client.graph.add(
2 user_id=user_id,
3 type="text",
4 data="The user is an avid fan of Eric Clapton"
5)
6
7search_results = client.graph.search(
8 user_id=user_id,
9 query="Eric Clapton",
10 scope="nodes",
11 limit=1,
12 reranker="cross_encoder",
13)
14
15print(search_results.nodes)
None

We can check the status of the episode to see when it has finished processing, using the episode returned from the graph.add method and the graph.episode.get method:

1while True:
2 episode = client.graph.episode.get(
3 uuid_=episode.uuid_,
4 )
5 if episode.processed:
6 print("Episode processed successfully")
7 break
8 print("Waiting for episode to process...")
9 time.sleep(1)
Waiting for episode to process...
Waiting for episode to process...
Waiting for episode to process...
Waiting for episode to process...
Waiting for episode to process...
Episode processed successfully

Now that the episode has finished processing, we can search for the data we just added, and this time we get a result:

1search_results = client.graph.search(
2 user_id=user_id,
3 query="Eric Clapton",
4 scope="nodes",
5 limit=1,
6 reranker="cross_encoder",
7)
8
9print(search_results.nodes)
[EntityNode(attributes={'category': 'Music', 'labels': ['Entity', 'Preference']}, created_at='2025-04-05T00:17:59.66565Z', labels=['Entity', 'Preference'], name='Eric Clapton', summary='The user is an avid fan of Eric Clapton.', uuid_='98808054-38ad-4cba-ba07-acd5f7a12bc0', graph_id='6961b53f-df05-48bb-9b8d-b2702dd72045')]