OpenAI Assistants Streaming in Python
This sample demonstrates how to use the OpenAI Assistants API with streaming in a Python console application.
main.py
openai_assistants_streaming.py
How to generate this sample
AI - Azure AI CLI, Version 1.0.0
Copyright (c) 2024 Microsoft Corporation. All Rights Reserved.
This PUBLIC PREVIEW version may change at any time.
See: https://aka.ms/azure-ai-cli-public-preview
Generating 'openai-asst-streaming' in 'openai-asst-streaming-py' (3 files)...
main.py
openai_assistants_streaming.py
requirements.txt
Generating 'openai-asst-streaming' in 'openai-asst-streaming-py' (3 files)... DONE!
main.py
STEP 1: Read the configuration settings from environment variables and validate them.
ASSISTANT_ID = os.getenv('ASSISTANT_ID') or "<insert your OpenAI assistant ID here>"
AZURE_OPENAI_API_KEY = os.getenv('AZURE_OPENAI_API_KEY', '<insert your Azure OpenAI API key here>')
AZURE_OPENAI_API_VERSION = os.getenv('AZURE_OPENAI_API_VERSION', '<insert your Azure OpenAI API version here>')
AZURE_OPENAI_ENDPOINT = os.getenv('AZURE_OPENAI_ENDPOINT', '<insert your Azure OpenAI endpoint here>')
AZURE_OPENAI_BASE_URL = f'{AZURE_OPENAI_ENDPOINT.rstrip("/")}/openai'
ok = \
ASSISTANT_ID != None and not ASSISTANT_ID.startswith('<insert') and \
AZURE_OPENAI_API_KEY != None and not AZURE_OPENAI_API_KEY.startswith('<insert') and \
AZURE_OPENAI_API_VERSION != None and not AZURE_OPENAI_API_VERSION.startswith('<insert') and \
AZURE_OPENAI_ENDPOINT != None and not AZURE_OPENAI_ENDPOINT.startswith('<insert')
if not ok:
print('To use Azure OpenAI, set the following environment variables:\n' +
'\n ASSISTANT_ID' +
'\n AZURE_OPENAI_API_KEY' +
'\n AZURE_OPENAI_API_VERSION' +
'\n AZURE_OPENAI_ENDPOINT')
print('\nYou can easily do that using the Azure AI CLI by doing one of the following:\n' +
'\n ai init' +
'\n ai dev shell' +
'\n python main.py' +
'\n' +
'\n or' +
'\n' +
'\n ai init' +
'\n ai dev shell --run "python main.py"')
os._exit(1)
STEP 2: Create the OpenAI client and initialize the assistant.
openai = OpenAI(
api_key = AZURE_OPENAI_API_KEY,
base_url = AZURE_OPENAI_BASE_URL,
default_query= { 'api-version': AZURE_OPENAI_API_VERSION },
default_headers = { 'api-key': AZURE_OPENAI_API_KEY }
)
assistant = OpenAIAssistantsStreamingClass(ASSISTANT_ID, openai)
STEP 3: Create or retrieve a thread, and display the messages if any.
if threadId is None:
assistant.create_thread()
else:
assistant.retrieve_thread(threadId)
assistant.get_thread_messages(lambda role, content: print(f'{role.capitalize()}: {content}', end=''))
STEP 4: Implement the user interaction loop to get responses from the assistant and display them.
while True:
user_input = input('User: ')
if user_input == 'exit' or user_input == '':
break
print('\nAssistant: ', end='')
assistant.get_response(user_input, lambda content: print(content, end=''))
print('\n')
print(f"Bye! (threadId: {assistant.thread.id})")
openai_assistants_streaming.py
STEP 1: Initialize the assistant with necessary configurations.
def __init__(self, assistant_id, openai):
self.assistant_id = assistant_id
self.thread = None
self.openai = openai
STEP 2: Create an event handler that processes each text delta.
class EventHandler(AssistantEventHandler):
def __init__(self, openai, callback):
super().__init__()
self.openai = openai
self.callback = callback
@override
def on_text_delta(self, delta, snapshot):
self.callback(delta.value)
@override
def on_event(self, event):
if event.event == 'thread.run.failed':
print(event)
raise Exception('Run failed')
super().on_event(event)
STEP 3: When the user provides input, add the user message to the chat message history.
message = self.openai.beta.threads.messages.create(
thread_id=self.thread.id,
role="user",
content=user_input,
)
STEP 4: Send the chat message history to the streaming OpenAI Assistants API and process each update.
with self.openai.beta.threads.runs.stream(
thread_id=self.thread.id,
assistant_id=self.assistant_id,
event_handler=EventHandler(self.openai, callback)
) as stream:
stream.until_done()
STEP 5: Add the assistant's response to the chat message history, and return response.
def get_response(self, user_input, callback) -> None:
if self.thread == None:
self.create_thread()
message = self.openai.beta.threads.messages.create(
thread_id=self.thread.id,
role="user",
content=user_input,
)
with self.openai.beta.threads.runs.stream(
thread_id=self.thread.id,
assistant_id=self.assistant_id,
event_handler=EventHandler(self.openai, callback)
) as stream:
stream.until_done()
STEP 6: Create and retrieve thread methods for handling threads.
def create_thread(self):
self.thread = self.openai.beta.threads.create()
return self.thread
def retrieve_thread(self, thread_id):
self.thread = self.openai.beta.threads.retrieve(thread_id)
return self.thread
STEP 7: Retrieve and display previous messages in the thread.