GAE – Reading a big CSV Blobstore in Python

Last week I was trying to upload and parse a big CSV blobstore to Google App Engine (GAE) using python. If you are familiar with GAE you will probably know already that uploading files is not allowed in the standard way, as the application is running in a sandbox within a read-only system. Therefore, if you need to upload something, you have to use the blobstore API provided by google.

In order to do that you need to do the following steps:

1. Request an upload url (somewhere in your app):

from google.appengine.ext import blobstore
url = blobstore.create_upload_url('/destination_after_upload')

This is the url where you have to submit the form with the file and (optionally) any additional data. Once the submission of the form has been completed, the request will be automatically redirected to the provided handler/url (in the example: /destination_after_upload) with all the form data plus the blobstore information.

Note: Everywhere I’ve seen, people assume that you put the generated url in the action of a form in your template/view. However, it is possible for instance to return the url, and then perform a POST request to it programatically, and it will work as well. Example:

#The result of this request will be just the url generated by the previous sample code
upload_url=`curl http://your.appengine.site/get_upload_url/`
#We can curl-post a file to that url, and it will work exactly the same as if we were submitting a form through the browser.
curl -vX POST -F "file=@/path/to/your/file.csv" $upload_url

2. Retrieve the data (in the destination url), for this I’ve used the helper get_uploads:

def destination_after_upload(request):
    # Note that 'file' is the name of the file element used in the form
    upload_files = get_uploads(request, field_name='file', populate_post=True)
    blob = upload_files[0]

3. Parse the CSV file: For this I tried reading the file normally with the csv module, but after facing some issues with the new line characters I ended up using the class BlobIterator and it was working like a charm. Then, I started noticing some random error in the logs due some line of the CSV being corrupted. I was using the following code to parse the CSV:

    # Read CSV content from Blobstore
    blob_reader = blobstore.BlobReader(blob.key())
    blob_iterator = BlobIterator(blob_reader)
    reader = csv.reader(blob_iterator, skipinitialspace=True, delimiter=',')

    # headers = next(reader)
    for row in reader:
        print row
        # do something...
    blobstore.delete(blob.key())

It was all working fine except for some random lines, but I checked the original CSV file and the lines were correct there. I started debugging the execution with PyCharm and I noticed that the issue seemed to be in the next method of the class BlobIterator. Let’s have a look at the code:

def next(self):
        if not self.buffer or len(self.lines) == self.line_num:
            self.buffer = self.blob_reader.read(1048576) # 1MB buffer
            self.lines = self.buffer.splitlines()
            self.line_num = 0

            # Handle special case where our block just happens to end on a new line
            if self.buffer[-1:] == "\n" or self.buffer[-1:] == "\r":
                self.lines.append("")

        if not self.buffer:
            raise StopIteration

        if self.line_num == 0 and len(self.last_line) > 0:
            result = self.last_line + self.lines[self.line_num] + "\n"
        else:
            result = self.lines[self.line_num] + "\n"

        self.last_line = self.lines[self.line_num]
        self.line_num += 1

        return result

I placed a break point in side the first “if” statement, which is the point where it loads one megabyte of data into an internal buffer. The way it works is, summarizing: Load one megabyte of data, split it by the newlines and keep a counter of the current line, returning it on each “next” iteration. The problem is that by loading one megabyte of data, it’s very likely that the last line will be cut in the middle, so you have to concatenate the last line of the first buffer with the first line of the next one, in order to return the proper result. However, the code was already doing it, and the debugger was showing the proper value on the “result” variable, but then when the execution was going back to my “for” loop, the value of the row was actually like this: “last,part,of,first,buffer\n,last,part,of,first,buffer,first,part,next,buffer”

Apparently, it was returning the proper line but prepended by the residual content of the first buffer plus a \n. I was going crazy trying to find out the problem, I even thought that there was an issue on the csv reader module, until I tried to reproduce the issue in a simpler sample:

import unittest
import csv

class TestIterator:
    def __init__(self, data):
        self.data = data
        self.last_line = ""
        self.line_num = 0
        self.lines = []
        self.buffer = None

    def __iter__(self):
        return self

    def next(self):
        if not self.buffer or len(self.lines) == self.line_num:
            self.buffer = self.data[:15]
            self.data = self.data[15:]
            self.lines = self.buffer.splitlines()
            self.line_num = 0

            # Handle special case where our block just happens to end on a new line
            if self.buffer[-1:] == "\n" or self.buffer[-1:] == "\r":
                self.lines.append("")

        if not self.buffer:
            raise StopIteration

        if self.line_num == 0 and len(self.last_line) > 0:
            result = self.last_line + self.lines[self.line_num] + "\n"
        else:
            result = self.lines[self.line_num] + "\n"

        self.last_line = self.lines[self.line_num]
        self.line_num += 1

        return result

class IteratorTest(unittest.TestCase):

    def setUp(self):
        # Read CSV content from string data
        data = """1,2,3,4,5
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5
"""
        self.test_iterator = TestIterator(data)

    def test_iterator(self):
        reader = csv.reader(self.test_iterator, skipinitialspace=True, delimiter=',')
        for row in reader:
            print row
            if row:
                self.assertEqual(row, ['1', '2', '3', '4', '5'])

def main():
    unittest.main()

if __name__ == '__main__':
    main()

And with this code I was able to reproduce the issue. It turns out that the problem is in the line_num counter that starts from zero and hence it was loading the next buffer one iteration later than it should. I managed to fix it with the following changes:

    def next(self):
        if not self.buffer or len(self.lines) == (self.line_num + 1):
            if self.lines:
                self.last_line = self.lines[self.line_num]
            self.buffer = self.data[:15]
            self.data = self.data[15:]
            self.lines = self.buffer.splitlines()
            self.line_num = 0
            print self.lines

            # Handle special case where our block just happens to end on a new line
            if self.buffer[-1:] == "\n" or self.buffer[-1:] == "\r":
                self.lines.append("")

        if not self.buffer:
            raise StopIteration

        if self.line_num == 0 and len(self.last_line) > 0:
            print 'fixing'
            result = self.last_line + self.lines[self.line_num] + "\n"
        else:
            print self.line_num
            print len(self.lines)
            result = self.lines[self.line_num] + "\n"

        self.line_num += 1

        return result

Note that the first three lines are different and the line “self.last_line = self.lines[self.line_num]” has been moved inside the if. With this changes the iterator seems to work fine when changing between buffer iterations, so the fixed/final version of the BlobstoreIterator is:

class BlobIterator:
    """Because the python csv module doesn't like strange newline chars and
    the google blob reader cannot be told to open in universal mode, then
    we need to read blocks of the blob and 'fix' the newlines as we go.
    Fixed the problem with the corrupted lines when fetching new data into the buffer."""

    def __init__(self, blob_reader):
        self.blob_reader = blob_reader
        self.last_line = ""
        self.line_num = 0
        self.lines = []
        self.buffer = None

    def __iter__(self):
        return self

    def next(self):
        if not self.buffer or len(self.lines) == self.line_num + 1:
            if self.lines:
                self.last_line = self.lines[self.line_num]
            self.buffer = self.blob_reader.read(1048576) # 1MB buffer
            self.lines = self.buffer.splitlines()
            self.line_num = 0

            # Handle special case where our block just happens to end on a new line
            if self.buffer[-1:] == "\n" or self.buffer[-1:] == "\r":
                self.lines.append("")

        if not self.buffer:
            raise StopIteration

        if self.line_num == 0 and len(self.last_line) > 0:
            result = self.last_line + self.lines[self.line_num] + "\n"
        else:
            result = self.lines[self.line_num] + "\n"

        self.line_num += 1
        return result

I hope it helps you to save a headache if you are having the same issue.