Parsing ReceivingProgressed event for URLConnection

OK, this is probably user error and a mistake on my part but I need some please…

I’m working on adding support for Gemini LLMs to AIKit and have hit an issue.

AIKit communicates with LLMs through various vendor APIs essentially using a URLConnection. Specifically the URLConnection makes a GET request to the API and expects a streaming response of JSON. For Anthropic, OpenAI and Ollama I had been using the URLConnection.ReceivingProgressed event to handle the streaming.

The structure of the data received various by vendor but typically it was in a format similar to:

data: {SOME VALID JSON}

data: {MORE JSON}

I would split the string in the ReceivingProgressed event by a delimited and handle each individual JSON string separately. No issues.

With the Gemini API however, what seems to be happening is the string passed into the ReceivingProgressed event is a carved up JSON string. The first time the event fires I might get the opening part of the JSON and then the next time the event fires it’s a continuation of the JSON object and then maybe the start of another.

I don’t know how best to handle this - does anyone have a recommendation?

I considered using the ContentReceived event of the URLConnection class but if I do that I essentially lose the ability to display streaming responses.

1 Like

So you need a way of checking for valid JSON in the event, right? The first quick hack that comes to mind is to put ParseJSON in a Try-Catch.

Hi Garry,

In the case where the entire contents of a response come in over serveral ‘bursts’, maybe use the URLConnection.ContentReceived event instead? My brain always defaults back to RS232 and buffering of Serial data - I realize that this is different. In Serial the messages always had a delimiter that marked the end of the packet (CRLF for instance). In JSON it’s hard to spot, unless you know what the tail end of a complete message looks like.

This is the reason I do this stuff in a thread. The thread has a method to get the next “item” (in my case, a line of text terminated by NewLine, in yours a JSON item). If the method determines there’s no more data, but the item accumulated so far is incomplete, it pauses the thread, and the thread is later resumed by the DataAvailable event handler once more data arrives. The method thus continues, doing a .ReadAll as its first action after being resumed. If a complete item can then be returned, it does so, otherwise it may have to do the pause/resume again.

This means that the possibly intermittent arrival of data is hidden completely from the code calling for an item, which can then just assume that it asks for an item, it gets an item.

Do a test and print all segments and all data possible for deeper analysis.

Include the headers. Date/time of the arrival of the chunk, and the HTTP STATUS of each chunk.

After that we will have some material to understand better if the API says things as “All the content is here”, or “That’s part, will continue”, or “This is the last part, joint all the parts”

1 Like

These are all good suggestions.

I think I’m going to have start dumping the contents from a variety of requests to see what the pattern is as it’s not easy to appreciate at the moment.

Very frustrating because the data that arrives in pieces from all other providers always seems to be complete JSON but not from Gemini. I wonder why that is?

1 Like

Sounds like the response is being chunked, so like Rick suggests, add each part to a String array, then String.FromArray (using an empty string as delimiter) them together at the end. In theory you’ll have valid JSON text after joining.

Edited to add: Watch that the ReceivingProgressed event is not capturing any unnecessary line endings (the delimiter for chunked content).

1 Like

@GarryPettet - first of all, don’t do any processing of data in the ReceivingProgressed event itself. Events like that on sockets are reentrant and you could end up having your app trying to process the same data in two places at the same time.

I find that the simplest way to deal with protocols like this are to use a custom memoryblock subclass where you can track both reading from the middle and writing to the end using a binarystream.

Add a property so you can know if processing is already in progress:

mProcessing as Boolean

Add a process queue:
processQueue() as String

ReceivingProgressed simply becomes:

MyData.position = MyData.length
MyData.write(newData)
Timer.CallLater(10, addressOf processdata)

The processData method: (this is gonna be pseudo-code)

If not mProcessing then
    mProcessing = True

    While not at end of data
      // starting at where we left off reading, 
      // look ahead for the next instance of "data:"
      // if found, grab the part before it for processing
      // add block to processQueue
      // copy everything after to a new memoryblock
      // reset read position to 0
      // use timer or threads to process items from 
      // processQueue

      // if not found, record data length
      // minus length of the delimiter (5)
      // exit while
      
    Wend
    mProcessing = false
End If
2 Likes

I can show you for real later if there’s still confusion, but I haven’t had my morning coffee yet.

1 Like

I agree here and I missed that on the OP request.

My initial (mis)understanding was that he was receiving a long chunked JSON firing multiple ContentReceived

ReceivingProgressed should be used for informative reasons only.

Thank you for the feedback everyone.

Urgh - looks like I am going to have to do some refactoring if I want to be able to support streaming properly. I guess I’ve just been lucky up until this point.

I’ll report back…

If data is coming in quickly I think this can result in a whole bunch of Timers being instantiated. I use one timer in Single mode and reset it each time data is received so it doesn’t fire until there’s a pause in the data.

2 Likes

That’s a good point. I wrote it this way in the post for expediency.