OK, this is probably user error and a mistake on my part but I need some please…
I’m working on adding support for Gemini LLMs to AIKit and have hit an issue.
AIKit communicates with LLMs through various vendor APIs essentially using a URLConnection. Specifically the URLConnection makes a GET request to the API and expects a streaming response of JSON. For Anthropic, OpenAI and Ollama I had been using the URLConnection.ReceivingProgressed event to handle the streaming.
The structure of the data received various by vendor but typically it was in a format similar to:
data: {SOME VALID JSON}
data: {MORE JSON}
I would split the string in the ReceivingProgressed event by a delimited and handle each individual JSON string separately. No issues.
With the Gemini API however, what seems to be happening is the string passed into the ReceivingProgressed event is a carved up JSON string. The first time the event fires I might get the opening part of the JSON and then the next time the event fires it’s a continuation of the JSON object and then maybe the start of another.
I don’t know how best to handle this - does anyone have a recommendation?
I considered using the ContentReceived event of the URLConnection class but if I do that I essentially lose the ability to display streaming responses.
In the case where the entire contents of a response come in over serveral ‘bursts’, maybe use the URLConnection.ContentReceived event instead? My brain always defaults back to RS232 and buffering of Serial data - I realize that this is different. In Serial the messages always had a delimiter that marked the end of the packet (CRLF for instance). In JSON it’s hard to spot, unless you know what the tail end of a complete message looks like.
This is the reason I do this stuff in a thread. The thread has a method to get the next “item” (in my case, a line of text terminated by NewLine, in yours a JSON item). If the method determines there’s no more data, but the item accumulated so far is incomplete, it pauses the thread, and the thread is later resumed by the DataAvailable event handler once more data arrives. The method thus continues, doing a .ReadAll as its first action after being resumed. If a complete item can then be returned, it does so, otherwise it may have to do the pause/resume again.
This means that the possibly intermittent arrival of data is hidden completely from the code calling for an item, which can then just assume that it asks for an item, it gets an item.
Do a test and print all segments and all data possible for deeper analysis.
Include the headers. Date/time of the arrival of the chunk, and the HTTP STATUS of each chunk.
After that we will have some material to understand better if the API says things as “All the content is here”, or “That’s part, will continue”, or “This is the last part, joint all the parts”
I think I’m going to have start dumping the contents from a variety of requests to see what the pattern is as it’s not easy to appreciate at the moment.
Very frustrating because the data that arrives in pieces from all other providers always seems to be complete JSON but not from Gemini. I wonder why that is?
Sounds like the response is being chunked, so like Rick suggests, add each part to a String array, then String.FromArray (using an empty string as delimiter) them together at the end. In theory you’ll have valid JSON text after joining.
Edited to add: Watch that the ReceivingProgressed event is not capturing any unnecessary line endings (the delimiter for chunked content).
@GarryPettet - first of all, don’t do any processing of data in the ReceivingProgressed event itself. Events like that on sockets are reentrant and you could end up having your app trying to process the same data in two places at the same time.
I find that the simplest way to deal with protocols like this are to use a custom memoryblock subclass where you can track both reading from the middle and writing to the end using a binarystream.
Add a property so you can know if processing is already in progress:
The processData method: (this is gonna be pseudo-code)
If not mProcessing then
mProcessing = True
While not at end of data
// starting at where we left off reading,
// look ahead for the next instance of "data:"
// if found, grab the part before it for processing
// add block to processQueue
// copy everything after to a new memoryblock
// reset read position to 0
// use timer or threads to process items from
// processQueue
// if not found, record data length
// minus length of the delimiter (5)
// exit while
Wend
mProcessing = false
End If
Urgh - looks like I am going to have to do some refactoring if I want to be able to support streaming properly. I guess I’ve just been lucky up until this point.
If data is coming in quickly I think this can result in a whole bunch of Timers being instantiated. I use one timer in Single mode and reset it each time data is received so it doesn’t fire until there’s a pause in the data.