Last week I was trying to upload and parse a big CSV blobstore to Google App Engine (GAE) using python. If you are familiar with GAE you will probably know already that uploading files is not allowed in the standard way, as the application is running in a sandbox within a read-only system. Therefore, if you need to upload something, you have to use the blobstore API provided by google.
In order to do that you need to do the following steps:
1. Request an upload url (somewhere in your app):
from google.appengine.ext import blobstore url = blobstore.create_upload_url('/destination_after_upload')
This is the url where you have to submit the form with the file and (optionally) any additional data. Once the submission of the form has been completed, the request will be automatically redirected to the provided handler/url (in the example: /destination_after_upload) with all the form data plus the blobstore information.
Note: Everywhere I’ve seen, people assume that you put the generated url in the action of a form in your template/view. However, it is possible for instance to return the url, and then perform a POST request to it programatically, and it will work as well. Example:
#The result of this request will be just the url generated by the previous sample code upload_url=`curl http://your.appengine.site/get_upload_url/` #We can curl-post a file to that url, and it will work exactly the same as if we were submitting a form through the browser. curl -vX POST -F "file=@/path/to/your/file.csv" $upload_url
2. Retrieve the data (in the destination url), for this I’ve used the helper get_uploads:
def destination_after_upload(request): # Note that 'file' is the name of the file element used in the form upload_files = get_uploads(request, field_name='file', populate_post=True) blob = upload_files[0]
3. Parse the CSV file: For this I tried reading the file normally with the csv module, but after facing some issues with the new line characters I ended up using the class BlobIterator and it was working like a charm. Then, I started noticing some random error in the logs due some line of the CSV being corrupted. I was using the following code to parse the CSV:
# Read CSV content from Blobstore blob_reader = blobstore.BlobReader(blob.key()) blob_iterator = BlobIterator(blob_reader) reader = csv.reader(blob_iterator, skipinitialspace=True, delimiter=',') # headers = next(reader) for row in reader: print row # do something... blobstore.delete(blob.key())
It was all working fine except for some random lines, but I checked the original CSV file and the lines were correct there. I started debugging the execution with PyCharm and I noticed that the issue seemed to be in the next method of the class BlobIterator. Let’s have a look at the code:
def next(self): if not self.buffer or len(self.lines) == self.line_num: self.buffer = self.blob_reader.read(1048576) # 1MB buffer self.lines = self.buffer.splitlines() self.line_num = 0 # Handle special case where our block just happens to end on a new line if self.buffer[-1:] == "\n" or self.buffer[-1:] == "\r": self.lines.append("") if not self.buffer: raise StopIteration if self.line_num == 0 and len(self.last_line) > 0: result = self.last_line + self.lines[self.line_num] + "\n" else: result = self.lines[self.line_num] + "\n" self.last_line = self.lines[self.line_num] self.line_num += 1 return result
I placed a break point in side the first “if” statement, which is the point where it loads one megabyte of data into an internal buffer. The way it works is, summarizing: Load one megabyte of data, split it by the newlines and keep a counter of the current line, returning it on each “next” iteration. The problem is that by loading one megabyte of data, it’s very likely that the last line will be cut in the middle, so you have to concatenate the last line of the first buffer with the first line of the next one, in order to return the proper result. However, the code was already doing it, and the debugger was showing the proper value on the “result” variable, but then when the execution was going back to my “for” loop, the value of the row was actually like this: “last,part,of,first,buffer\n,last,part,of,first,buffer,first,part,next,buffer”
Apparently, it was returning the proper line but prepended by the residual content of the first buffer plus a \n. I was going crazy trying to find out the problem, I even thought that there was an issue on the csv reader module, until I tried to reproduce the issue in a simpler sample:
import unittest import csv class TestIterator: def __init__(self, data): self.data = data self.last_line = "" self.line_num = 0 self.lines = [] self.buffer = None def __iter__(self): return self def next(self): if not self.buffer or len(self.lines) == self.line_num: self.buffer = self.data[:15] self.data = self.data[15:] self.lines = self.buffer.splitlines() self.line_num = 0 # Handle special case where our block just happens to end on a new line if self.buffer[-1:] == "\n" or self.buffer[-1:] == "\r": self.lines.append("") if not self.buffer: raise StopIteration if self.line_num == 0 and len(self.last_line) > 0: result = self.last_line + self.lines[self.line_num] + "\n" else: result = self.lines[self.line_num] + "\n" self.last_line = self.lines[self.line_num] self.line_num += 1 return result class IteratorTest(unittest.TestCase): def setUp(self): # Read CSV content from string data data = """1,2,3,4,5 1,2,3,4,5 1,2,3,4,5 1,2,3,4,5 1,2,3,4,5 1,2,3,4,5 1,2,3,4,5 """ self.test_iterator = TestIterator(data) def test_iterator(self): reader = csv.reader(self.test_iterator, skipinitialspace=True, delimiter=',') for row in reader: print row if row: self.assertEqual(row, ['1', '2', '3', '4', '5']) def main(): unittest.main() if __name__ == '__main__': main()
And with this code I was able to reproduce the issue. It turns out that the problem is in the line_num counter that starts from zero and hence it was loading the next buffer one iteration later than it should. I managed to fix it with the following changes:
def next(self): if not self.buffer or len(self.lines) == (self.line_num + 1): if self.lines: self.last_line = self.lines[self.line_num] self.buffer = self.data[:15] self.data = self.data[15:] self.lines = self.buffer.splitlines() self.line_num = 0 print self.lines # Handle special case where our block just happens to end on a new line if self.buffer[-1:] == "\n" or self.buffer[-1:] == "\r": self.lines.append("") if not self.buffer: raise StopIteration if self.line_num == 0 and len(self.last_line) > 0: print 'fixing' result = self.last_line + self.lines[self.line_num] + "\n" else: print self.line_num print len(self.lines) result = self.lines[self.line_num] + "\n" self.line_num += 1 return result
Note that the first three lines are different and the line “self.last_line = self.lines[self.line_num]” has been moved inside the if. With this changes the iterator seems to work fine when changing between buffer iterations, so the fixed/final version of the BlobstoreIterator is:
class BlobIterator: """Because the python csv module doesn't like strange newline chars and the google blob reader cannot be told to open in universal mode, then we need to read blocks of the blob and 'fix' the newlines as we go. Fixed the problem with the corrupted lines when fetching new data into the buffer.""" def __init__(self, blob_reader): self.blob_reader = blob_reader self.last_line = "" self.line_num = 0 self.lines = [] self.buffer = None def __iter__(self): return self def next(self): if not self.buffer or len(self.lines) == self.line_num + 1: if self.lines: self.last_line = self.lines[self.line_num] self.buffer = self.blob_reader.read(1048576) # 1MB buffer self.lines = self.buffer.splitlines() self.line_num = 0 # Handle special case where our block just happens to end on a new line if self.buffer[-1:] == "\n" or self.buffer[-1:] == "\r": self.lines.append("") if not self.buffer: raise StopIteration if self.line_num == 0 and len(self.last_line) > 0: result = self.last_line + self.lines[self.line_num] + "\n" else: result = self.lines[self.line_num] + "\n" self.line_num += 1 return result
I hope it helps you to save a headache if you are having the same issue.