Python – Processing a huge file (9.1GB) and processing it faster — Python

performancepython

I have a 9GB text file of tweets in the following format:

T      'time and date'
U      'name of user in the form of a URL'
W      Actual tweet

There are in total 6,000,000 users and more than 60,000,000 tweets. I read 3 lines at a time using itertools.izip() and then according to the name, write it into a file. But its taking way too long (26 hours and counting). How can this be made faster?

Posting code for completeness,

s='the existing folder which will have all the files'
with open('path to file') as f:
 for line1,line2,line3 in itertools.izip_longest(*[f]*3):
            if(line1!='\n' and line2!='\n' and line3!='\n'):
     line1=line1.split('\t')
     line2=line2.split('\t')
     line3=line3.split('\t')
     if(not(re.search(r'No Post Title',line1[1]))):
         url=urlparse(line3[1].strip('\n')).path.strip('/')

  if(url==''):
   file=open(s+'junk','a')
   file.write(line1[1])
   file.close()
  else:
   file=open(s+url,'a')
   file.write(line1[1])
   file.close()

My aim is to use topic modeling on the small texts (as in, running lda on all the tweets of one user, thus requiring a separate file for each user), but its taking way too much time.

UPDATE: I used the suggestions by user S.Lott and used the following code :

import re
from urlparse import urlparse
import os 
def getUser(result):
    result=result.split('\n')
    u,w=result[0],result[1]
    path=urlparse(u).path.strip('/')
    if(path==''):
        f=open('path to junk','a')
        f.write('its Junk !!')
        f.close()
    else:
        result="{0}\n{1}\n{2}\n".format(u,w,path)
        writeIntoFile(result)
def writeIntoFile(result):
    tweet=result.split('\n')
    users = {}
    directory='path to directory'
    u, w, user = tweet[0],tweet[1],tweet[2]
    if user not in users :
        if(os.path.isfile(some_directory+user)):
            if(len(users)>64):
                lru,aFile,u=min(users.values())
                aFile.close()
                users.pop(u)
            users[user]=open(some_directory+user,'a')
            users[user].write(w+'\n')
            #users[user].flush
        elif (not(os.path.isfile(some_directory+user))):
            if len(users)>64:
                lru,aFile,u=min(users.values())
                aFile.close()
                users.pop(u)

            users[user]=open(some_directory+user,'w')
            users[user].write(w+'\n')
    for u in users:
        users[u].close()
import sys
s=open(sys.argv[1],'r')
tweet={}
for l in s:
    r_type,content=l.split('\t')
    if r_type in tweet:
    u,w=tweet.get('U',''),tweet.get('W','')
            if(not(re.search(r'No Post Title',u))):
                result="{0}{1}".format(u,w)
                getUser(result)
                tweet={}
        tweet[r_type]=content

Obviously, it is pretty much a mirror of what he suggested and kindly shared too. Initially the speed was very fast but then it has got slower . I have posted the updated code so that i can get some more suggestions on how it could have been made faster. If i was reading from sys.stdin, then there was an import error which could not be resolved by me. So, to save time and get on with it, i simply used this , hoping that it works and does so correctly. Thanks.

Best Answer

This is why your OS has multiprocessing pipelines.

collapse.py sometweetfile | filter.py | user_id.py | user_split.py -d some_directory

collapse.py

import sys
with open("source","r") as theFile:
    tweet = {}
    for line in theFile:
        rec_type, content = line.split('\t')
        if rec_type in tweet:
            t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
            result=  "{0}\t{1}\t{2}".format( t, u, w )
            sys.stdout.write( result )
            tweet= {}
        tweet[rec_type]= content
    t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
    result=  "{0}\t{1}\t{2}".format( t, u, w )
    sys.stdout.write( result )

filter.py

import sys
for tweet in sys.stdin:
    t, u, w = tweet.split('\t')
    if 'No Post Title' in t:
        continue
    sys.stdout.write( tweet )

user_id.py

import sys
import urllib
for tweet in sys.stdin:
    t, u, w = tweet.split('\t')
    path=urlparse(w).path.strip('/')
    result= "{0}\t{1}\t{2}\t{3}".format( t, u, w, path )
    sys.stdout.write( result )

user_split.py

users = {}
for tweet in sys.stdin:
    t, u, w, user = tweet.split('\t')
    if user not in users:
        # May run afoul of open file limits...
        users[user]= open(some_directory+user,"w")
    users[user].write( tweet )
    users[user].flush( tweet )
for u in users:
    users[u].close()

Wow, you say. What a lot of code.

Yes. But. It spreads out among ALL the processing cores you own and it all runs concurrently. Also, when you connect stdout to stdin through a pipe, it's really only a shared buffer: there's no physical I/O occurring.

It's amazingly fast to do things this way. That's why the *Nix operating systems work that way. This is what you need to do for real speed.


The LRU algorithm, FWIW.

    if user not in users:
        # Only keep a limited number of files open
        if len(users) > 64: # or whatever your OS limit is.
            lru, aFile, u = min( users.values() )
            aFile.close()
            users.pop(u)
        users[user]= [ tolu, open(some_directory+user,"w"), user ]
    tolu += 1
    users[user][1].write( tweet )
    users[user][1].flush() # may not be necessary
    users[user][0]= tolu