User:RussBot/dabmaintbot.py

From Wikipedia, the free encyclopedia
#!/usr/bin/python
"""
dabmaintbot - Bot to update link counts on
[[en:Wikipedia:Disambiguation pages maintenance]]
"""

import datetime
import locale
import re, sys, traceback
import simplejson
import urllib
import wikipedia, pagegenerators

locale.setlocale(locale.LC_ALL, '')

s#Constants:
ACTIVE_CUTOFF = 100
HISTORY_LEN = 6

import datetime
started_at = datetime.datetime.now()

# cache page objects to reduce server load
pagecache = {}

def getPage(title):
    global pagecache
    if '#' in title:
        sf_title = title[:title.index('#')]
    else:
        sf_title = title
    return pagecache.setdefault(sf_title, wikipedia.Page(site, title))

def cacheput(page):
    global pagecache
    title = page.sectionFreeTitle()
    pagecache[title] = page

def prefetch(page):
    while True:
        try:
            page.get(get_redirect=True)
            return
        except wikipedia.BadTitle:
            wikipedia.output("Got BadTitle exception on %s; retrying."
                             % page.title())
            continue
        except wikipedia.Error:
            return

def refcount(page):
    if hasattr(page, "refcount"):
        return page.refcount
    data = {'action': 'query',
            'generator': 'backlinks',
            'gbltitle': page.sectionFreeTitle(),
            'gblnamespace': '0',
            'gbllimit': '500',
            'redirects': 'redirects',
            'format': 'json',
            }
    count = 0
    while True:
        wikipedia.get_throttle()
        try:
#        wikipedia.output("Getting references to %s" % page.aslink())
            reflist = site.getUrl(site.api_address(), data=data)
        except:
            traceback.print_exc(file=sys.stderr)
            continue
        try:
            result = simplejson.loads(reflist)
        except ValueError:
            continue
        if type(result) is not dict or 'query' not in result:
            return 0
        if 'redirects' in result['query']:
            for redirect in result['query']['redirects']:
                if redirect['to'] == page.sectionFreeTitle():
                    count += refcount(wikipedia.Page(site, redirect['from'])) 
        if 'pages' in result['query']:
            for ref_id in result['query']['pages']:
                refpage = result['query']['pages'][ref_id]
                if refpage['title'] != page.sectionFreeTitle():
                    count += 1
        if "query-continue" in result:
            data.update(result['query-continue']['backlinks'])
        else:
            return count

def increasing(seq):
    '''Return True if seq is uniformly increasing (from last to first),
    False otherwise'''
    for index in range( len(seq) - 1 ):
        if seq[index] <= seq[index+1]:
            return False
    return True

def fmt(num):
    return locale.format("%i", num, grouping=True)

try:
    site = wikipedia.getSite()

    #input pages
    maint_page = wikipedia.Page(site,
                    "Wikipedia:Disambiguation pages with links/Current list")
    dump_page = wikipedia.Page(site,
                    "User:RussBot/DPL")
    problem_page = wikipedia.Page(site,
                    "Wikipedia:Disambiguation pages with links/problems")
    #output pages
    result_page = wikipedia.Page(site,
                    "Wikipedia:Disambiguation pages with links/Current list")
    problem_result = wikipedia.Page(site,
                    "Wikipedia:Disambiguation pages with links/problems")

    for arg in sys.argv[1:]:
        arg = wikipedia.argHandler(arg, 'dabmaintbot')
        if arg:
            print "Unrecognized command line argument: %s" % arg
            # show help text and exit
            wikipedia.argHandler("-help", "dabmaintbot")

    mylang = site.language()

    fixed_pages = 0
    fixed_links = 0
    problems = []
    m_text = maint_page.get()

    active_r = re.compile(
        r"^# (?:'''&bull; )?\[\[(.+)\]\] *\(([0-9]*) *" +
        r"\[\[Special:Whatlinkshere/(?:.+)\|links\]\]\) *" +
        r"(?:\((?:(?:new)|(?:[-+][0-9]+))\))? *" +
        r"(?:<!-- history (.*?)-->)? *(.*?) *(?:''')? *$", re.M)
    # the groups matched by this regex are:
    # 1.  the title of a disambiguation page
    # 2.  the number of links found last time the bot ran (may be empty)
    # 3.  the history of the page's link count (may be empty), consisting of a
    #     space-separated string of numbers
    # 4.  any notes added by users at the end of the line

    inactive_r = re.compile(
        r'^# \[\[(.+)\]\] \(([0-9]+)\) history ([0-9 ]*):(.*) *$', re.M)
    # the groups matched by this regex are the same as for active_r

    # lists are demarcated by HTML comments

    # Step 1: Collect all links and histories from the last scan
    
    start_mark = u"<!-- section title="
    end_mark = u"<!-- end section -->"
    marker = 0
    new_text = []
    disambiglinks = {}
    total_count = [0, 0, 0, 0]
    sections = []
    diffs = []
    while True:
        section_start = m_text.find(start_mark, marker)
        if section_start == -1:
            break
        title_mark = section_start + len(start_mark)
        section_title = m_text[title_mark:
                               m_text.find(u" -->\n", title_mark)]
        section_marker = title_mark + len(section_title) + len(" -->\n")
        if section_marker >= len(m_text):
            wikipedia.output(
                u"ERROR: cannot locate section title in %s" % section_title)
            raise RuntimeError
        
        section_end = m_text.find(end_mark, section_marker)
        if section_end == -1:
            wikipedia.output(
                u"ERROR: cannot locate end of section %s" % section_title)
            raise RuntimeError
        marker = section_end
        sections.append((section_title, section_marker, section_end))
        sectionnumber = len(sections) - 1
        
        for item in active_r.finditer(m_text, section_marker, section_end):
            link_page_title = item.group(1)
            link_page = getPage(link_page_title)
            try:
                prefetch(link_page)
                while link_page.isRedirectPage():
                    link_page = link_page.getRedirectTarget()
                    prefetch(link_page)
                if not link_page.isDisambig():
                    continue
            except wikipedia.NoPage:
                continue
            link_page_title = link_page.sectionFreeTitle()
            if link_page_title in disambiglinks.keys():
                continue
            count = refcount(link_page)
            wikipedia.output(u"%s [%i]" % (link_page.title(), count))
            if item.group(3):
                history = item.group(3)
            else:
                history = u''
            disambiglinks[link_page_title] = {
                'section': sectionnumber,
                'title': link_page_title,
                'count': count,
                'history_text': history,
                'trailing_text': item.group(4).strip()
            }

        # search for inactive listings, which should always follow active ones
        for item in inactive_r.finditer(m_text, section_marker, section_end):
            link_page_title = item.group(1)
            link_page = getPage(link_page_title)
            try:
                prefetch(link_page)
                while link_page.isRedirectPage():
                    link_page = link_page.getRedirectTarget()
                    prefetch(link_page)
                if not link_page.isDisambig():
                    continue
            except wikipedia.NoPage:
                continue
            link_page_title = link_page.title()
            if link_page_title in disambiglinks.keys():
                continue
            count = refcount(link_page)
            wikipedia.output(u"%s [%i]" % (link_page.title(), count))
            if item.group(3):
                history = item.group(3)
            else:
                history = u''
            disambiglinks[link_page_title] = {
                'section': sectionnumber,
                'title': link_page_title,
                'count': count,
                'history_text': history,
                'trailing_text': item.group(4).strip()
            }

    # Step 2.  Collect links from data dump output page and add any that
    # aren't already in the collection

    for link_page in dump_page.linkedPages():
        try:
            prefetch(link_page)
            while link_page.isRedirectPage():
                link_page = link_page.getRedirectTarget()
                prefetch(link_page)
            if not link_page.isDisambig():
                continue
        except wikipedia.NoPage:
            continue
        link_page_title = link_page.sectionFreeTitle()
        if link_page_title in disambiglinks.keys():
            continue
        count = refcount(link_page)
        wikipedia.output(u"%s [%i]" % (link_page.title(), count))
        history = u''
        disambiglinks[link_page_title] = {
            'section': 0,  # All new articles go into 'general' until classified
            'title': link_page_title,
            'count': count,
            'history_text': history,
            'trailing_text': u''
        }

    # Step 3.  Sort links by section and count, and output page
    marker = 0
    for (number, (section_name, section_marker, section_end)
         ) in enumerate(sections):
        section_links = [link for link in disambiglinks.values()
                         if link['section'] == number]
        section_links.sort(key=lambda i:i['count'], reverse=True)
        section_count = [0, 0]
        new_text.append(m_text[marker:section_marker])
        active = True
        for link in section_links:
            if link['count'] < ACTIVE_CUTOFF and active:
                active = False
                new_text.append(u"<!-- Inactive articles:\n")
            if link['history_text']:
                history = [int(n) for n in link['history_text'].split(" ")]
            else:
                history = []
            history = [link['count']] + history
            while len(history) > HISTORY_LEN:
                del history[-1]
            if len(history) == 1:
                link['diff'] = 'new'
            else:
                link['diff'] = "%+i" % (history[0] - history[1])
                diffs.append( (history[0]-history[1], link['title']) )
                if history[0] < history[1]:
                    fixed_pages += 1
                    fixed_links += (history[1] - history[0])
            link['history_text'] = " ".join(str(x) for x in history)
##            print link[1]+":", history
            
            if max(history) < ACTIVE_CUTOFF / 4:
                # discard items that have no significant history
                continue

            if active:
                section_count[0] += 1
                section_count[1] += link['count']
                item = (
u"[[%(title)s]] (%(count)i [[Special:Whatlinkshere/%(title)s|links]]) " +
u"(%(diff)s)<!-- history %(history_text)s--> %(trailing_text)s") % link
                # bullet items that have shown unusual or persistent increases
                if (len(history) > 1 and
                        history[0]-history[1] > ACTIVE_CUTOFF / 2
                   ) or (
                        len(history) == HISTORY_LEN and
                        increasing(history) and
                        history[0] - history[-1] > ACTIVE_CUTOFF
                   ):
                    prefix = "'''&bull; "
                    suffix = "'''"
                    item.rstrip("'")
                    problems.append(
u"* [[%(title)s]] (%(count)i [[Special:Whatlinkshere/%(title)s|links]]) (%(diff)s)\n"
                        % link)
                else:
                    prefix = suffix = ""
                new_text.append("# %s%s%s\n" % (prefix, item, suffix))
            else:
                total_count[2] += 1
                total_count[3] += link['count']
                new_text.append(
u"# [[%(title)s]] (%(count)i) history %(history_text)s: %(trailing_text)s\n"
                    % link)
        if not active:
            new_text.append("-->\n")
        marker = section_end
        new_text.append(
            u"\n Section '%s' contains %i links to %i active articles.\n" %
            (section_name, section_count[1], section_count[0]))
        total_count[0] += section_count[0]
        total_count[1] += section_count[1]

    diffs.sort()
    statistics_point = m_text.find(u"|}")
    if statistics_point >= 0:
        text = m_text[marker:statistics_point]
        text = re.sub(r"(?s)<!--banner-->.*?<!--/banner-->",
"""<!--banner-->
'''''Since last week, at least %s links to %s pages have been fixed!'''''
<!--/banner-->"""
                          % (fmt(fixed_links), fmt(fixed_pages)), text)
        top10 = ["\n===Top 10 increases==="]
        for item in reversed(diffs[-10:]):
            top10.append("# [[%s]] (%i)" % (item[1], item[0]))
        top10.append("===Top 10 decreases===")
        for item in diffs[:10]:
            top10.append("# [[%s]] (%i)" % (item[1], item[0]))
        top10.append("<!--/banner-->")
        text = text.replace("<!--/banner-->", "\n".join(top10))
        new_text.append(text)
        marker = statistics_point
        new_text.append(u"|-\n")
        today = datetime.date.today()
        new_text.append(u"| %4i-%02i-%02i || %s || %s || %s || %s\n"
                        % (today.year, today.month, today.day,
                           fmt(total_count[0]+total_count[2]),
                           fmt(total_count[0]),
                           fmt(total_count[1]+total_count[3]),
                           fmt(total_count[1])))
        
    new_text.append(m_text[marker:])
    wikipedia.setAction(u"Disambiguation page maintenance script")
    result_page.put(u"".join(new_text))
    prob_text = problem_page.get()
    header_start = prob_text.index("<noinclude>")
    header_end = prob_text.index("</noinclude>") + len("</noinclude>")
    problem_result.put(prob_text[header_start:header_end] + "\n" +
                      u"".join(problems))
    
finally:
    elapsed = datetime.datetime.now() - started_at
    print "elapsed time = " + str(elapsed)
    wikipedia.stopme()