Streaming JSON with Flask

Update March 2021

This post still gets a fair few hits, so I want to preface this by saying that I wrote this some time ago, and probably wouldn’t do the same thing today. The main problem is that I ignored the client! Sending a single whole JSON object in such a way means the client has to reassemble the whole thing in-memory anyway, which somewhat defeats the point, unless you have fat clients and very thin servers (or processing is slow and you have a very short socket timeout). You might have a valid use-case for a hack like this, but if I were to solve this problem again, I’d send newline-delimited json instead (with metadata in headers or on a separate endpoint if necessary).

I have a SQLAlchemy query (able to behave as an iterator) which could return a large result set. First version of the code was very simple. Release objects have a to_dict() function which returns a dictionary, so I append to a list and jsonify the result:

# releases = <SQLAlchemy query object>

output = []
for r in releases:
    output.append(r.to_dict())

return jsonify(releases=output), 200

(context on github)

This result set could potentially grow to a point that fitting it memory would be impractical – with only a thousand releases there is already a significant lag before we start getting results.

Unfortunately, Flask’s jsonify() function doesn’t support streaming, so we have to do it manually as described in the Flask documentation. I thus came up with a simple generator like so:

# query = <something>

def generate():
    yield '{"releases": ['
    for release in query:
        yield json.dumps(release.to_dict()) + ', '
    yield ']}'

return Response(generate(), content_type='application/json')

The problem is, that trying to json.loads() the output of this, will result in “ValueError: No JSON object could be decoded”, because the last element in the list will have a comma. No .join() for us!

Thus we need to detect the last iteration, and omit the comma.

How does one do this? I found a handy answer on stackoverflow, which describes using what is called a “lagging generator”. On each yield we return the previous iteration, which allows us to look ahead.

So I modified the generator, and came up with the following:

def generate():
    releases = query.__iter__()
    prev_release = next(releases)  # get first result

    yield '{"releases": ['

    # Iterate over the releases
    for release in releases:
        yield json.dumps(prev_release.to_dict()) + ', '
        prev_release = release

    # Now yield the last iteration without comma but with the closing brackets
    yield json.dumps(prev_release.to_dict()) + ']}'

Now we can detect the last iteration and omit the comma, substituting for the closing brackets instead.

There’s just one problem. When the length of the query result is zero (a reasonable situation), the first next(releases) call will raise StopIteration before we’ve outputted any JSON. Code that expects a valid JSON document will thus fail.

The solution is therefore to catch the first StopIteration, yield a valid “empty” JSON result set, and re-raise the StopIteration. The final solution is thus:

def generate():
    """
    A lagging generator to stream JSON so we don't have to hold everything in memory

    This is a little tricky, as we need to omit the last comma to make valid JSON,
    thus we use a lagging generator, similar to http://stackoverflow.com/questions/1630320/
    """
    releases = query.__iter__()
    try:
        prev_release = next(releases)  # get first result
    except StopIteration:
        # StopIteration here means the length was zero, so yield a valid releases doc and stop
        yield '{"releases": []}'
        raise StopIteration

    # We have some releases. First, yield the opening json
    yield '{"releases": ['

    # Iterate over the releases
    for release in releases:
        yield json.dumps(prev_release.to_dict()) + ', '
        prev_release = release

    # Now yield the last iteration without comma but with the closing brackets
    yield json.dumps(prev_release.to_dict()) + ']}'

return Response(generate(), content_type='application/json')

(github link)

6 thoughts on “Streaming JSON with Flask

  1. Sven

    That was a very helpful post to quickly turn around a feature I’d think would take much longer. Thanks for sharing!

    I’ve adjusted the logic for the iterator a bit to try and send the first item and then subsequently send ‘,’ + json(next item) (i.e. prefix the comma for sends after the first). I find that a bit clearer and it avoids having to keep the previous item around:

    “`
    def stream():
    yield ‘[‘

    reports = query.__iter__()
    try:
    r = next(reports)
    yield json.dumps(to_dict(r), default=date_encoder)
    except StopIteration:
    # no results – close array and stop iteration
    yield ‘]’
    raise StopIteration

    # loop over remaining results
    for r in reports:
    yield ‘,’ + json.dumps(to_dict(r), default=date_encoder)

    # close array
    yield ‘]’

    return Response(stream(), content_type=’application/json’)
    “`

    Note I’m just sending a straight array instead of an dictionary with an array so the opening and closing elements are slightly different. I.e. replace `'[‘` with `'{“releases”: [‘` and `’]’` with `]}`.

    Reply
    1. Alex Forbes Post author

      Hi Sven glad it was useful. This looks like a solid improvement, I’ll probably integrate it into my implementation next time I work on it. Shame about the formatting but I’ll manage!

      Reply
  2. Rohith

    Thank you, Alex! This was really helpful :)

    I used a simple custom function to encode the date for json.

    def json_serial(obj):
    “””JSON serializer for objects not serializable by default json code”””
    if isinstance(obj, datetime.datetime):
    return obj.__str__()

    json.dumps(to_dict(r), default=json_serial)

    Reply
  3. Le Loss

    How do you requests.get(stream=True) this on the client side? Would you have to re-assemble the streamed unicode (with length given by chunk_size) into a valid json object? That sounds a bit counter-productive.

    Reply
    1. Alex Forbes Post author

      This is practically invisible to the client, other than timing. It just means that the server can start transmitting without having to assemble the entire json blob in memory first, with the effect that the client sees a stready stream of data, rather than a long pause before the whole payload is dumped as fast as the network can transmit.

      What I did here though wasn’t brilliant (it was a while ago….) – I constructed a JSON array which isn’t technically valid JSON until the final line is sent. So the client needs to read it as a whole anyway, but the chunk size wouldn’t be a factor, and the client code would be no different to the non-chunked approach.

      Most systems doing this sort of job would send each chunk as an individually-valid JSON object, which is more reasonable with large amounts of data that would be expected to be handled individually.

      edit: Technical term for this is ndjson, or new-line-delimited JSON: http://ndjson.org/

      Reply

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.