Check Data Ingestion Status

Data added to Zep is processed asynchronously and can take a few seconds to a few minutes to finish processing. This recipe shows how to check whether data upload operations are finished processing.

Zep provides two methods for checking data ingestion status:

  • Task polling (recommended for async operations): Use client.task.get() to check the status of batch operations, clone operations, and fact triple additions
  • Episode polling: Use graph.episode.get() to check individual episode processing status

Checking Operation Status with Task Polling

When using operations that return a task_id, you can poll for completion status using client.task.get(). The following operations return a task_id:

  • graph.add_batch() - Batch episode additions
  • thread.add_messages_batch() - Batch message additions to threads
  • graph.clone() - Graph cloning operations
  • graph.add_fact_triple() - Custom fact/node triplet additions

This is the recommended approach for these operations as it provides a single status check for the entire operation.

First, let’s create a user:

1import os
2import uuid
3import time
4from dotenv import find_dotenv, load_dotenv
5from zep_cloud.client import Zep
6from zep_cloud import EpisodeData
7
8load_dotenv(dotenv_path=find_dotenv())
9
10client = Zep(api_key=os.environ.get("ZEP_API_KEY"))
11uuid_value = uuid.uuid4().hex[:4]
12user_id = "-" + uuid_value
13client.user.add(
14 user_id=user_id,
15 first_name="John",
16 last_name="Doe",
17 email="[email protected]"
18)

Now, let’s add a batch of episodes to the graph. The response includes a task_id in each episode that we can use to check the processing status:

1# Add batch data to the graph
2episodes = [
3 EpisodeData(
4 data="The user is an avid fan of Eric Clapton",
5 type="text"
6 ),
7 EpisodeData(
8 data="The user attended a concert last night",
9 type="text"
10 ),
11 EpisodeData(
12 data="The user plays guitar as a hobby",
13 type="text"
14 )
15]
16
17batch_result = client.graph.add_batch(
18 graph_id=user_id,
19 episodes=episodes
20)
21
22# Get the task_id from the first episode (all episodes in a batch share the same task_id)
23task_id = batch_result[0].task_id
24print(f"Batch processing task ID: {task_id}")

Now we can poll the task status using client.task.get() to check when the entire batch has finished processing:

1# Poll the task status until completion
2while True:
3 task = client.task.get(task_id=task_id)
4
5 if task.status == "completed":
6 print("Batch processing completed successfully")
7 break
8 elif task.status == "failed":
9 print(f"Batch processing failed: {task.error}")
10 break
11
12 print(f"Batch processing status: {task.status}")
13 time.sleep(1)

Once the batch is complete, you can search for the data that was added:

1search_results = client.graph.search(
2 user_id=user_id,
3 query="Eric Clapton",
4 scope="nodes",
5 limit=1,
6 reranker="cross_encoder",
7)
8
9print(search_results.nodes)

Checking Individual Episode Status with Episode Polling

For single episode operations or when you need to check the status of individual episodes, you can use the graph.episode.get() method. This approach is useful when adding data one episode at a time.

First, let’s create a user:

1import os
2import uuid
3import time
4from dotenv import find_dotenv, load_dotenv
5from zep_cloud.client import Zep
6
7load_dotenv(dotenv_path=find_dotenv())
8
9client = Zep(api_key=os.environ.get("ZEP_API_KEY"))
10uuid_value = uuid.uuid4().hex[:4]
11user_id = "-" + uuid_value
12client.user.add(
13 user_id=user_id,
14 first_name = "John",
15 last_name = "Doe",
16 email="[email protected]"
17)

Now, let’s add some data and immediately try to search for that data; because data added to Zep is processed asynchronously and can take a few seconds to a few minutes to finish processing, our search results do not have the data we just added:

1episode = client.graph.add(
2 user_id=user_id,
3 type="text",
4 data="The user is an avid fan of Eric Clapton"
5)
6
7search_results = client.graph.search(
8 user_id=user_id,
9 query="Eric Clapton",
10 scope="nodes",
11 limit=1,
12 reranker="cross_encoder",
13)
14
15print(search_results.nodes)
None

We can check the status of the episode to see when it has finished processing, using the episode returned from the graph.add method and the graph.episode.get method:

1while True:
2 episode = client.graph.episode.get(
3 uuid_=episode.uuid_,
4 )
5 if episode.processed:
6 print("Episode processed successfully")
7 break
8 print("Waiting for episode to process...")
9 time.sleep(1)
Waiting for episode to process...
Waiting for episode to process...
Waiting for episode to process...
Waiting for episode to process...
Waiting for episode to process...
Episode processed successfully

Now that the episode has finished processing, we can search for the data we just added, and this time we get a result:

1search_results = client.graph.search(
2 user_id=user_id,
3 query="Eric Clapton",
4 scope="nodes",
5 limit=1,
6 reranker="cross_encoder",
7)
8
9print(search_results.nodes)
[EntityNode(attributes={'category': 'Music', 'labels': ['Entity', 'Preference']}, created_at='2025-04-05T00:17:59.66565Z', labels=['Entity', 'Preference'], name='Eric Clapton', summary='The user is an avid fan of Eric Clapton.', uuid_='98808054-38ad-4cba-ba07-acd5f7a12bc0', graph_id='6961b53f-df05-48bb-9b8d-b2702dd72045')]