from os import environ import requests import re from bs4 import BeautifulSoup from datetime import datetime, timedelta import asyncio from nio import ClientConfig, AsyncClient, LoginResponse, InviteEvent def get_accesstoken_from_file(accesstoken_path): accesstoken_file = open(accesstoken_path, 'r') single_accesstoken = accesstoken_file.read().strip() accesstoken_file.close() return single_accesstoken def get_blog(): url = 'https://news.blizzard.com/en-us/' html = requests.get(url).text soup = BeautifulSoup(html, 'html.parser') base_url = 'https://news.blizzard.com' blog = [] feature_list_html = soup.find_all(class_='FeaturedArticle-link') for feature_html in feature_list_html: image_html = feature_html.find(class_='Card-image') image_url_fragment = re.findall('url\("(.*?)"\)', image_html.attrs['style'])[0] image_url = 'https:'+image_url_fragment text_list = feature_html.find_all(class_='text-truncate-ellipsis') blog.append({ 'image': image_url, 'category': text_list[0].contents[0].replace(' ', '').replace(':', '').lower(), 'title': text_list[1].contents[0], 'description': '', 'url': base_url+feature_html.attrs['href'], }) article_list_html = soup.find_all(class_='ArticleListItem') for article_html in article_list_html: image_html = article_html.find(class_='ArticleListItem-image') image_url_fragment = re.findall('url\((.*?)\)', image_html.attrs['style'])[0] image_url = 'https:'+image_url_fragment content_html = article_html.find(class_='ArticleListItem-contentGrid') blog.append({ 'image': image_url, 'category': content_html.find(class_='ArticleListItem-subtitle').find(class_='ArticleListItem-labelInner').contents[0].replace(' ', '').replace(':', '').lower(), 'title': content_html.find(class_='ArticleListItem-title').contents[0], 'description': content_html.find(class_='ArticleListItem-description').find(class_='h6').contents[0], 'url': base_url+article_html.find(class_='ArticleLink').attrs['href'], }) # reverse order so the oldest article is at [0] # we want to iterate later from oldest to newest blog.reverse() return blog def get_body(post): body = post['title']+"\n" if post['description']: body += post['description']+"\n" body += post['url'] return body def get_formatted_body(post): formatted_body = '
' formatted_body += post['title'] formatted_body += '
' if post['description']: formatted_body += '

'+post['description']+'

' return formatted_body async def main(): # initialize new client config = ClientConfig(store_sync_tokens=True) matrix = AsyncClient(homeserver, config=config) # login login_response = LoginResponse(mxid, 'xxx', accesstoken) await matrix.receive_response(login_response) # filter out everything except m.room.member (for invites) sync_filter = { 'room': { 'state': { 'types': ['m.room.member'], 'lazy_load_members': True }, 'timeline': { 'types': ['invalid'] }, 'ephemeral': { 'types': ['invalid'] } } } # setting this to enforce a scrape at first loop next_update = datetime.now() # use this event type to store our url cache cache_event_type = 'de.lubiland.snowstorm-matrix.cache' while True: # do sync first to e.g. accept an admin room invite sync = await matrix.sync(sync_filter=sync_filter) for room_id in sync.rooms.invite: print('joining '+room_id) await matrix.join(room_id) if next_update < datetime.now(): # refresh url cache cache_state = await matrix.room_get_state_event(room_id=admin_room, event_type=cache_event_type+'cache', state_key=category) if hasattr(cache_state, 'content') and 'url_list' in cache_state.content: cache = cache_state.content['url_list'] else: cache = [] # scape all blog posts and process them blog = get_blog() for post in blog: # check if post url is in cache and matches our category if post['url'] not in cache and post['category'] == category: # post url not found in cache # announce new post to matrix rooms content = { 'msgtype': 'm.notice', 'body': get_body(post), 'format': 'org.matrix.custom.html', 'formatted_body': get_formatted_body(post) } for room_id in matrix.rooms: # don't send updates to the admin room if room_id != admin_room: await matrix.room_send(room_id=room_id, message_type='m.room.message', content=content) # add url to cache cache += [post['url']] else: # no new posts found pass # trim the cache # len(blog) is usually bigger than the count of posts in our category, # so with len(blog) instead of the latter we have some buffer while len(cache) > len(blog): cache.remove(cache[0]) # set new cache event await matrix.room_put_state(room_id=admin_room, event_type=cache_event_type, state_key=category, content={'url_list': cache}) next_update = datetime.now() + timedelta(minutes=15) homeserver = environ['HOMESERVER'] mxid = environ['MXID'] accesstoken = get_accesstoken_from_file(environ['ACCESSTOKEN_FILE']) admin_room = environ['ADMIN_ROOM'] category = environ['CATEGORY'] asyncio.run(main())