Jump to content

User:Qwerfjkl (bot)/code/CategoryMonitor

From Wikipedia, the free encyclopedia
import time, re, pywikibot
from pywikibot.comms.eventstreams import EventStreams
from mwparserfromhell import parse
import json

config_page = pywikibot.Page(pywikibot.Site('en', 'wikipedia'), 'User:Qwerfjkl (bot)/CategoryMonitorConfig.json')
config = json.loads(config_page.text)
for lang in config:
    config[lang]['site'] = pywikibot.Site(config[lang]['site'])

stream = EventStreams(streams=['recentchange'],
                     since=config['en']['site'].server_time()) # base time off of enwiki
def stream_filter(data):
    # print(data)
    lang = data['server_name'].split('.')[0]
    return (data['server_name'].endswith('.wikipedia.org') and data['type'] == 'categorize' and data['bot'] == False and lang in config and data['title'] in config[lang]['CS1CATS'] and config[lang]['removedmessage'] not in data['comment'])

# stream.register_filter(stream_filter, ftype='all')

# print(config)
SKIPTAGS = ['mw-undo', 'mw-revert', 'mw-rollback', 'mw-reverted', 'mw-manual-revert']
prev = False
group = []
def notify(data):
    print('Handling case')
    ts = data['timestamp']
    lang = data['server_name'].replace('.wikipedia.org', '')
    site = config[lang]['site']
    # if lang == 'sq':
    #     print(data)
    #     input()
    if group:
        # get different titles
        titles = list(set([re.match(r"\[\[:(.+?)\]\]", data['comment']).group(1) for data in group]))
        print(titles)
        for title in titles:
            changes = [data for data in group if title in data['comment']]
            cat_changes = list(set(data['title'] for data in changes if data['title'] in config[lang]['CS1CATS'] and config[lang]['removedmessage'] not in data['comment']))
            if cat_changes:
                user = changes[0]['user']
                timestamp = pywikibot.Timestamp.fromISOformat(ts, sep='T')
                page = pywikibot.Page(site, title)
                if not page.text or page.namespace().id != 0: # blank, non-existant, or not in mainspace
                    print('Bad page title', page.title())
                    continue
                    # todo: use 'notify_url': 'https://sq.wikipedia.org/w/index.php?diff=2643546&oldid=1587213'
                revisions = [revision for revision in list(page.revisions()) if revision['timestamp'] == timestamp]
                if len(revisions) != 1:
                    # print(revisions)
                    print('Error, multiple/no edits with that timestamp', page.title())
                    continue
                else:
                    revision = revisions[0]
                    if any(tag in revision['tags'] for tag in SKIPTAGS):
                        print('Skipped, skiptags')
                        continue # skip
                    revid = revision['revid']
                    print(f'Special:Diff/{revid}')
                time_diff = 60*15 + int(float(timestamp.posix_timestamp_format())) - int(float(pywikibot.Timestamp.fromISOformat(site.server_time(), sep='T').posix_timestamp_format()))
                if time_diff > 0:
                    print(f'Sleeping {time_diff} seconds')
                    time.sleep(time_diff)
                else:
                    pass
                    print(time_diff)
                print('Sleep over, checking')
                page.get(force=True)
                pagecats = [cat.title() for cat in page.categories()]
                print(pagecats, cat_changes)

                current_errors = [cat for cat in cat_changes if cat in pagecats] # errors still present on page
                if not any(cat in pagecats for cat in cat_changes):
                    print('Error has been fixed, skipping')
                    continue
                count = str(len(current_errors))
                if lang == 'en' and 'Category:CS1 errors: bare URL'  in current_errors and 'Category:CS1 errors: missing title' in current_errors:
                    current_errors.remove('Category:CS1 errors: bare URL')
                    current_errors.remove('Category:CS1 errors: missing title')
                    current_errors.insert(0, 'Category:CS1 errors: bare URLCategory:CS1 errors: missing title')
                if lang =='en':
                    cause = lambda x : 'Category:CS1 errors: bare URL|bare URL]] and [[:Category:CS1 errors: missing title|missing title' if x == 'Category:CS1 errors: bare URLCategory:CS1 errors: missing title' else x+'|'+x.replace('Category:CS1 errors: ', '')
                elif lang == 'sq':
                    # x + | + all content after the second colon (otherwise just the input)
                    cause = lambda x : x+'|'+ ( x[[i for i, n in enumerate(x) if n == ':'][1]+1:].lstrip() ) if x.count(':') >= 2 else x
                else: # fallback
                    cause = lambda x : x
                notif_page = pywikibot.Page(site, 'User talk:'+user)
                if notif_page.isRedirectPage():
                    notif_page = notif_page.getRedirectTarget()

                # Allow optout
                cont = False
                for template in notif_page.templatesWithParams():
                    if template[0].title() == 'Template:Bots' or template[0].title() == 'Template:Nobots':
                        for param in template[1]:
                            if 'optout' in param:
                                optout_values = param.split('=')[1].strip().split(',')
                                if 'cs1-errors' in optout_values:
                                    cont = True
                if cont: # exit for loop to skip main loop
                    print('Skipped, user opted-out')
                    continue

                header = f"== {config[lang]['sectionheader'].format(title=title)} =="
                top = f"\n{{{{subst:User:Qwerfjkl (bot)/inform/top|count={count}|page={title}|diff={str(revid)}}}}}" 
                middle = "\n".join([f"{{{{subst:User:Qwerfjkl (bot)/inform/middle|causes={config[lang]['causesmessage'].format(cause=cause(category))}|cat={category}|page={title}|diff={str(revid)}}}}}" for category in current_errors]) 
                end = f"\n{{{{subst:User:Qwerfjkl (bot)/inform/bottom|page={title}|diff={str(revid)}}}}}"
                text = header+top+middle+end
                # print(text)
                current = notif_page.text
                notif_page.text = current + '\n' + text
                try:
                    notif_page.save(f"/* {parse(config[lang]['sectionheader'].format(title=title)).strip_code()} */ {config[lang]['editsummary'].format(title=title)}", botflag=False)
                    print(f'[[{title}]] was added to: {", ".join(cat_changes)} at {ts} by {user}')
                except Exception as e:
                    print(f"Exception on {page}: {e}")

                
print('Edit handler started')
while True:
    change = next(iter(stream))
    if stream_filter(change):
        print(change)
    current = {'timestamp': change['meta']['dt'], 'server_name': change['server_name']}
   
    if prev and current['timestamp'] == prev['timestamp'] and current['server_name'] == prev['server_name'] and stream_filter(change):
        print('Added change to list to notify')
        group.append(change)
    elif prev and group:
        print('Notifying and clearing')
        notify(prev) #pass in ts
            
        group.clear()
        if stream_filter(change):
            group.append(change)
    else:
        pass
        # print(f'Prev: {prev}, Group: {group} was therefore skipped.')
    prev = current.copy()