from os import environ import asyncio import re from copy import deepcopy from datetime import datetime, timedelta from random import randrange import requests from bs4 import BeautifulSoup from nio import ClientConfig, AsyncClient, LoginResponse def get_accesstoken_from_file(accesstoken_path): accesstoken_file = open(accesstoken_path, 'r', encoding='utf8') single_accesstoken = accesstoken_file.read().strip() accesstoken_file.close() return single_accesstoken def extract_image_url(image_html): # only recent articles use "" to escape the url, so we have to search for # with quotes and without quotes image_url_fragment = re.findall(r'url\("?(.*?)"?\)', image_html.attrs['style'])[0] return 'https:'+image_url_fragment def sanitize_category(raw_category): return raw_category.replace(' ', '').replace(':', '').replace('.', '').lower() def get_blog(): url = 'https://news.blizzard.com/en-us/' html = requests.get(url, timeout=60).text soup = BeautifulSoup(html, 'html.parser') base_url = 'https://news.blizzard.com' blog = [] for featured_article in soup.select('#featured-articles article'): image_url = extract_image_url(featured_article.find(class_='Card-image')) text_list = featured_article.select('.text-truncate-ellipsis') category = sanitize_category(text_list[0].text) title = text_list[1].text url = base_url+featured_article.find('a').attrs['href'] blog.append({ 'image': image_url, 'category': category, 'title': title, 'description': '', # featured articles don't have a description 'url': url, }) for recent_article in soup.select('#recent-articles article'): image_url = extract_image_url(recent_article.find(class_='ArticleListItem-image')) category = sanitize_category(recent_article.find(class_='ArticleListItem-subtitle').find(class_='ArticleListItem-labelInner').text) title = recent_article.find(class_='ArticleListItem-title').text description = recent_article.find(class_='ArticleListItem-description').find(class_='h6').text url = base_url+recent_article.find('a').attrs['href'] blog.append({ 'image': image_url, 'category': category, 'title': title, 'description': description, 'url': url }) # reverse order so the oldest article is at [0] # we want to iterate later from oldest to newest blog.reverse() return blog def get_body(post): body = post['title']+"\n" if post['description']: body += post['description']+"\n" body += post['url'] return body def get_formatted_body(post): formatted_body = '
' formatted_body += post['title'] formatted_body += '
' if post['description']: formatted_body += '

'+post['description']+'

' return formatted_body async def main(): # initialize new client config = ClientConfig(store_sync_tokens=True) matrix = AsyncClient(homeserver, config=config) # login login_response = LoginResponse(mxid, 'xxx', accesstoken) await matrix.receive_response(login_response) await matrix.set_presence('offline') # filter out everything except m.room.member (for invites) sync_filter = { 'room': { 'state': { 'types': ['m.room.member'], 'lazy_load_members': True }, 'timeline': { 'types': ['invalid'] }, 'ephemeral': { 'types': ['invalid'] } } } # setting this to enforce a scrape at first loop next_update = datetime.now() # use this event type to store our url cache cache_event_type = 'de.lubiland.snowstorm-matrix.cache' while True: # do sync first to e.g. accept an admin room invite sync = await matrix.sync(timeout=30000, sync_filter=sync_filter) print('last sync: '+str(datetime.now())) for room_id in sync.rooms.invite: print('joining: '+room_id) await matrix.join(room_id) if next_update < datetime.now(): # refresh url cache cache = {} for category in category_list: cache_state = await matrix.room_get_state_event(room_id=admin_room, event_type=cache_event_type, state_key=category) if not hasattr(cache, category): cache[category] = [] if hasattr(cache_state, 'content') and 'url_list' in cache_state.content: cache[category] += cache_state.content['url_list'] old_cache = deepcopy(cache) # scrape all blog posts and process them blog = get_blog() for post in blog: category = post['category'] # check if post url is in cache and matches our category if category in category_list and post['url'] not in cache[category]: # post url not found in cache # announce new post to matrix rooms print('new post: '+post['url']) content = { 'msgtype': 'm.notice', 'body': get_body(post), 'format': 'org.matrix.custom.html', 'formatted_body': get_formatted_body(post) } for room_id in matrix.rooms: # don't send updates to the admin room if room_id != admin_room: print('to room: '+room_id) await matrix.room_send(room_id=room_id, message_type='m.room.message', content=content) # add url to cache cache[category] += [post['url']] else: # no new posts found pass # cleanup cache and push it as room state for category in cache.keys(): # trim the cache while len(cache[category]) > 100: cache[category].remove(cache[category][0]) # set new cache event if old_cache[category] != cache[category]: await matrix.room_put_state(room_id=admin_room, event_type=cache_event_type, state_key=category, content={'url_list': cache[category]}) # wait between 15min and 30min to randomize scraping next_update = datetime.now() + timedelta(minutes=randrange(15, 30)) print('next scrape: '+str(next_update)) homeserver = environ['HOMESERVER'] print('homeserver: '+homeserver) mxid = environ['MXID'] print('homeserver: '+mxid) accesstoken = get_accesstoken_from_file(environ['ACCESSTOKEN_FILE']) print('accesstoken_file: '+environ['ACCESSTOKEN_FILE']) admin_room = environ['ADMIN_ROOM'] print('admin_room: '+admin_room) category_list = environ['CATEGORY'].split(',') print('categories:') print(category_list) asyncio.run(main())