from os import environ import requests import re from bs4 import BeautifulSoup from datetime import datetime, timedelta import asyncio from nio import ClientConfig, AsyncClient, LoginResponse, InviteEvent def get_accesstoken_from_file(accesstoken_path): accesstoken_file = open(accesstoken_path, 'r') single_accesstoken = accesstoken_file.read().strip() accesstoken_file.close() return single_accesstoken async def on_event(room, event): if hasattr(event, 'membership'): if event.membership == 'invite': # automatically join invites print('joining '+room.room_id) join = await matrix[event.source['state_key']].join(room.room_id) def get_blog(): url = 'https://news.blizzard.com/en-us/' html = requests.get(url).text soup = BeautifulSoup(html, 'html.parser') base_url = 'https://news.blizzard.com' blog = [] feature_list_html = soup.find_all(class_='FeaturedArticle-link') for feature_html in feature_list_html: image_html = feature_html.find(class_='Card-image') image_url_fragment = re.findall('url\("(.*?)"\)', image_html.attrs['style'])[0] image_url = 'https:'+image_url_fragment text_list = feature_html.find_all(class_='text-truncate-ellipsis') blog.append({ 'image': image_url, 'game': text_list[0].contents[0].replace(' ', '').replace(':', '').lower(), 'title': text_list[1].contents[0], 'description': '', 'url': base_url+feature_html.attrs['href'], }) article_list_html = soup.find_all(class_='ArticleListItem') for article_html in article_list_html: image_html = article_html.find(class_='ArticleListItem-image') image_url_fragment = re.findall('url\((.*?)\)', image_html.attrs['style'])[0] image_url = 'https:'+image_url_fragment content_html = article_html.find(class_='ArticleListItem-contentGrid') blog.append({ 'image': image_url, 'game': content_html.find(class_='ArticleListItem-subtitle').find(class_='ArticleListItem-labelInner').contents[0].replace(' ', '').replace(':', '').lower(), 'title': content_html.find(class_='ArticleListItem-title').contents[0], 'description': content_html.find(class_='ArticleListItem-description').find(class_='h6').contents[0], 'url': base_url+article_html.find(class_='ArticleLink').attrs['href'], }) return blog def get_body(post): body = post['title']+"\n" if post['description']: body += post['description']+"\n" body += post['url'] return body def get_formatted_body(post): formatted_body = '
' formatted_body += post['title'] formatted_body += '
' if post['description']: formatted_body += '

'+post['description']+'

' return formatted_body async def main(): event_type_prefix = 'de.lubiland.snowstorm-matrix.' next_batch = {} for game in accesstoken: # initialize new client mxid = '@'+mxid_prefix+game+':'+homeserver_name config = ClientConfig(store_sync_tokens=True) matrix[mxid] = AsyncClient(homeserver_url, config=config) # login login_response = LoginResponse(mxid, 'xxx', accesstoken[game]) await matrix[mxid].receive_response(login_response) matrix[mxid].add_event_callback(on_event, InviteEvent) # do a first sync sync_filter = { 'room': { 'state': { 'types': ['m.room.member'], 'lazy_load_members': True }, 'timeline': { 'types': ['invalid'] }, 'ephemeral': { 'types': ['invalid'] } } } sync = await matrix[mxid].sync(timeout=3000, sync_filter=sync_filter) next_batch[mxid] = sync.next_batch next_update = datetime.now() while True: if next_update < datetime.now(): # refresh url cache cache_state = await matrix[next(iter(matrix))].room_get_state_event(room_id=admin_room, event_type=event_type_prefix+'cache', state_key='') if hasattr(cache_state, 'content') and 'url_list' in cache_state.content: cache = cache_state.content['url_list'] else: cache = [] # scape all blog posts and process them blog = get_blog() for post in blog: if post['url'] not in cache: # post url not found in cache mxid = '@'+mxid_prefix+post['game']+':'+homeserver_name if mxid in matrix: # announce new post to matrix rooms content = { 'msgtype': 'm.notice', 'body': get_body(post), 'format': 'org.matrix.custom.html', 'formatted_body': get_formatted_body(post) } for room_id in matrix[mxid].rooms: if room_id != admin_room: # don't send updates to the admin room await matrix[mxid].room_send(room_id=room_id, message_type='m.room.message', content=content) else: # no accesstoken for the calculated mxid content = { 'msgtype': 'm.notice', 'body': 'No accesstoken for '+mxid+' available.', 'format': 'org.matrix.custom.html', 'formatted_body': ('No accesstoken for '+ ''+mxid+' available.') } # send the message with the first available matrix client, # because we will always have at least one accesstoken await matrix[next(iter(matrix))].room_send(room_id=admin_room, message_type='m.room.message', content=content) # add url to cache cache += [post['url']] while len(cache) > len(blog): cache.remove(cache[0]) set_state = await matrix[next(iter(matrix))].room_put_state(room_id=admin_room, event_type=event_type_prefix+'cache', content={'url_list': cache}) else: # no new posts found pass next_update = datetime.now() + timedelta(minutes=15) for mxid in next_batch: sync = await matrix[mxid].sync(timeout=30000, sync_filter=sync_filter, since=next_batch[mxid]) next_batch[mxid] = sync.next_batch homeserver_name = environ['HOMESERVER_NAME'] homeserver_url = environ['HOMESERVER_URL'] mxid_prefix = environ['MXID_PREFIX'] admin_room = environ['ADMIN_ROOM'] accesstoken = {} for var in environ: if (game := re.match('^ACCESSTOKEN_([A-Z]*)_FILE$', var)) is not None: accesstoken[game[1].lower()] = get_accesstoken_from_file(environ[var]) matrix = {} asyncio.run(main())