Un article de Wikipédia, l'encyclopédie libre.
# -*- coding: iso-8859-1 -*-
import botpywi
import query
import wikipedia
import time
import datetime
import sys
class UnormalizedTitle(Exception):
"""Some title are not normalized"""
def diff_time(a, b):
d = a - b
diff = d.days * 86400.0
diff += d.seconds
diff /= 86400
return diff
def empty_result(data):
return len(data) == 1 and data.has_key('error') and data['error']['*'] == 'emptyresult'
# Translate a dict[id] = datas to a dict[page_name] = properties
def QueryTranslate(data):
result = {}
if empty_result(data):
print >> sys.stderr, 'empty result'
return result
for p in data[u'pages'].keys():
if not data[u'pages'][p].has_key('id'):
print >> sys.stderr, 'no id key, skipping', data[u'pages'][p]
continue
if data[u'pages'][p]['id'] == 0:
print >> sys.stderr, 'id == 0, skipping', data[u'pages'][p]
continue
result[data[u'pages'][p][u'title']] = {}
if data[u'pages'][p].has_key(u'normalizedTitle'):
raise UnormalizedTitle('Exception, UnormalizedTitle:\n' +
str(data[u'pages'][p]))
for t in data[u'pages'][p].keys():
result[data[u'pages'][p][u'title']][t] = data[u'pages'][p][t]
return result
def GetHistory( lang, titles, extraParams = None ):
"""
Usage example: data = GetHistory('ru','user:yurik')
titles may be either a title (as a string), or a list of strings
extraParams if given must be a dict() as taken by query.GetData()
"""
params = {'titles':query.ListToParam(titles), 'what' : 'revisions'}
params = query.CombineParams( params, extraParams )
return query.GetData( lang, params )
class PreloadingHistory:
"""
Wraps around another generator. Retrieves history of as many pages as
stated by pageNumber from that generator, and yields them one after the
other. Then retrieves more pages, etc.
"""
def __init__(self, generator, depth, pageNumber=200, lang = u'fr', contents = False):
self.lang = lang
self.generator = []
for p in generator:
if type(p) == wikipedia.Page:
print >> sys.stderr, "warning: using obsolete Page object"
self.generator.append(p.title())
else:
self.generator.append(p)
self.pageNumber = pageNumber
self.extraParams = { u'rvlimit' : str(depth) }
if contents:
self.extraParams = { u'rvcontent' : None }
def preload(self, pages):
data = GetHistory(self.lang, pages, self.extraParams)
return QueryTranslate(data)
def __iter__(self):
# FIXME : not complete, given the depth param we must iterate over
# each history which is 1) not complete 2) with cur_depth < depth
# this array will contain up to pageNumber pages and will be flushed
# after these pages have been preloaded and yielded.
somePages = []
for page in self.generator:
somePages.append(page)
if len(somePages) >= self.pageNumber:
result = self.preload(somePages)
for refpage in result.keys():
yield result[refpage]
somePages = []
# preload remaining pages
result = self.preload(somePages)
for refpage in result.keys():
yield result[refpage]
def last_modified_since(pages):
for p in PreloadingHistory(pages, 1):
time_text = p[u'revisions'][0][u'timestamp']
t = time.strptime(time_text, '%Y-%m-%dT%H:%M:%SZ')
now = datetime.datetime(2005, 1, 1).utcnow()
d = datetime.datetime(t[0], t[1], t[2], t[3], t[4], t[5], t[6])
diff = diff_time(now, d)
yield (p[u'title'], diff)
def elapsed_seconds(pages):
for p in PreloadingHistory(pages, 1):
time_text = p[u'revisions'][0][u'timestamp']
t = time.strptime(time_text, '%Y-%m-%dT%H:%M:%SZ')
yield time.mktime(t)
def GetCategory( lang, title, extraParams = None ):
""" Usage example: data = GetCategory('ru','Date')
extraParams if given must be a dict() as taken by query.GetData()
"""
params = {'cptitle':query.ListToParam(title), 'what' : 'category', 'cpextended' : '' }
params = query.CombineParams( params, extraParams )
return query.GetData( lang, params )
class _PreloadingCategory:
"""
Wraps around a category contents generator. Retrieves as many pages as
stated by pageNumber and yields them one after the other. Then retrieves
more pages, etc.
"""
def __init__(self, title, pageNumber=200, lang = 'fr'):
self.lang = lang
self.title = title
self.pageNumber = pageNumber
self.cpfrom = None
def preload(self):
extraParams = { 'cplimit' : str(self.pageNumber) }
if self.cpfrom:
extraParams['cpfrom'] = self.cpfrom
self.cpfrom = None # to stop iteration in __iter__() while loop
result = GetCategory(self.lang, self.title, extraParams)
if result.has_key('query'):
self.cpfrom = result['query']['category']['next']
return QueryTranslate(result)
def __iter__(self):
datas = self.preload()
while self.cpfrom:
for p in datas.keys():
yield datas[p]
datas = self.preload()
for p in datas.keys():
yield datas[p]
class PreloadingCategory:
"""
Identical to _PreloadingCategory but can recurse in category.
"""
def __init__(self, title, recurse = False, filtered_cat = [], lang = 'fr'):
self.recurse = recurse
self.lang = lang
self.cats_todo = [ title ]
self.cats_done = filtered_cat
def __iter__(self):
while self.cats_todo:
title = self.cats_todo.pop()
if title in self.cats_done:
continue
print >> sys.stderr, "getting", title.encode('utf-8')
self.cats_done.append(title)
preloader_generator = _PreloadingCategory(title, lang = self.lang)
for x in preloader_generator:
if self.recurse and x['ns'] == 14:
self.cats_todo.append(x['title'])
yield x
def GetInterwikies( lang, titles, extraParams = None ):
return query.GetInterwikies( lang, titles, extraParams)
class PreloadingInterwikies:
"""
Wraps around another generator. Retrieves interwikies of as many pages as
stated by pageNumber from that generator, and yields them one after the
other. Then retrieves more pages, etc.
"""
def __init__(self, generator, pageNumber=200, lang = u'fr'):
self.lang = lang
self.generator = []
for p in generator:
if type(p) == wikipedia.Page:
print >> sys.stderr, "warning: using obsolete Page object"
self.generator.append(p.title())
else:
self.generator.append(p)
self.pageNumber = pageNumber
def preload(self, pages):
data = GetInterwikies(self.lang, pages)
return QueryTranslate(data)
def __iter__(self):
# this array will contain up to pageNumber pages and will be flushed
# after these pages have been preloaded and yielded.
somePages = []
for page in self.generator:
somePages.append(page)
if len(somePages) >= self.pageNumber:
result = self.preload(somePages)
for refpage in result.keys():
yield result[refpage]
somePages = []
# preload remaining pages
result = self.preload(somePages)
for refpage in result.keys():
yield result[refpage]
def GetCategories( lang, titles, extraParams = None ):
"""
Usage example: data = GetHistory('ru','user:yurik')
titles may be either a title (as a string), or a list of strings
extraParams if given must be a dict() as taken by query.GetData()
"""
params = {'titles':query.ListToParam(titles), 'what' : 'categories', 'clextended' : ''}
params = query.CombineParams( params, extraParams )
return query.GetData( lang, params )
class PreloadingCategories:
"""
Wraps around another generator. Retrieves history of as many pages as
stated by pageNumber from that generator, and yields them one after the
other. Then retrieves more pages, etc.
"""
def __init__(self, generator, pageNumber=200, lang = u'fr'):
self.lang = lang
self.generator = []
for p in generator:
if type(p) == wikipedia.Page:
print >> sys.stderr, "warning: using obsolete Page object"
self.generator.append(p.title())
else:
self.generator.append(p)
self.pageNumber = pageNumber
def preload(self, pages):
data = GetCategories(self.lang, pages)
return QueryTranslate(data)
def __iter__(self):
# this array will contain up to pageNumber pages and will be flushed
# after these pages have been preloaded and yielded.
somePages = []
for page in self.generator:
somePages.append(page)
if len(somePages) >= self.pageNumber:
result = self.preload(somePages)
for refpage in result.keys():
yield result[refpage]
somePages = []
# preload remaining pages
result = self.preload(somePages)
for refpage in result.keys():
yield result[refpage]
def GetImageLinks( lang, titles, extraParams = None ):
"""
Usage example: data = GetHistory('ru','user:yurik')
titles may be either a title (as a string), or a list of strings
extraParams if given must be a dict() as taken by query.GetData()
"""
params = {'titles':query.ListToParam(titles), 'what' : 'imagelinks'}
params = query.CombineParams( params, extraParams )
return query.GetData( lang, params )
class PreloadingImageLinks:
"""
Wraps around another generator. Retrieves history of as many pages as
stated by pageNumber from that generator, and yields them one after the
other. Then retrieves more pages, etc.
"""
def __init__(self, title, depth=200, lang = u'fr'):
self.lang = lang
self.title = title
if type(title) == wikipedia.Page:
print >> sys.stderr, "warning: using obsolete Page object"
self.title = p.title()
self.depth = depth
self.next = None
def preload(self):
extraParams = { 'illimit' : str(self.depth) }
if self.next:
extraParams['ilcontfrom'] = self.next
self.next = None # to stop iteration in __iter__() while loop
result = GetImageLinks(self.lang, self.title, extraParams)
if result.has_key('query'):
self.next = result['query']['imagelinks']['next']
return QueryTranslate(result)
def __iter__(self):
datas = self.preload()
while self.next:
if datas[self.title].has_key('imagelinks'):
for p in datas[self.title]['imagelinks']:
yield p
datas = self.preload()
if datas[self.title].has_key('imagelinks'):
for p in datas[self.title]['imagelinks']:
yield p
def GetBackLinks( lang, titles, extraParams = None ):
"""
Usage example: data = GetBackLinks('ru','user:yurik')
titles may be either a title (as a string), or a list of strings
extraParams if given must be a dict() as taken by query.GetData()
"""
params = {'titles':query.ListToParam(titles), 'what' : 'backlinks'}
params = query.CombineParams( params, extraParams )
return query.GetData( lang, params )
class PreloadingBackLinks:
"""
Wraps around another generator. Retrieves backlinks of one pages, and
yields them one after the other.
"""
def __init__(self, title, depth=500, lang = u'fr'):
self.lang = lang
self.title = title
if type(title) == wikipedia.Page:
print >> sys.stderr, "warning: using obsolete Page object"
self.title = p.title()
self.depth = depth
self.next = None
def preload(self):
extraParams = { 'bllimit' : str(self.depth), 'blfilter' : 'all' }
if self.next:
extraParams['blcontfrom'] = self.next
self.next = None # to stop iteration in __iter__() while loop
result = GetBackLinks(self.lang, self.title, extraParams)
if result.has_key('query'):
print result
self.next = result['query']['backlinks']['next']
return QueryTranslate(result)
def __iter__(self):
datas = self.preload()
while self.next:
if datas[self.title].has_key('backlinks'):
for p in datas[self.title]['backlinks']:
yield p
datas = self.preload()
if datas[self.title].has_key('backlinks'):
for p in datas[self.title]['backlinks']:
yield p
if __name__ == "__main__":
try:
preload = PreloadingBacklinks(u'2005')
count = 0
for k in preload:
count += 1
print k
print count
#print GetBackLinks('fr', u'Accueil')
finally:
wikipedia.stopme()