from os import environ import requests import re from bs4 import BeautifulSoup from datetime import datetime, timedelta from random import randrange import asyncio from nio import ClientConfig, AsyncClient, LoginResponse, InviteEvent def get_accesstoken_from_file(accesstoken_path): accesstoken_file = open(accesstoken_path, 'r') single_accesstoken = accesstoken_file.read().strip() accesstoken_file.close() return single_accesstoken def extract_image_url(image_html): # only recent articles use "" to escape the url, so we have to search for # with quotes and without quotes image_url_fragment = re.findall(r'url\("?(.*?)"?\)', image_html.attrs['style'])[0] return 'https:'+image_url_fragment def sanitize_category(raw_category): return raw_category.replace(' ', '').replace(':', '').lower() def get_blog(): url = 'https://news.blizzard.com/en-us/' html = requests.get(url).text soup = BeautifulSoup(html, 'html.parser') base_url = 'https://news.blizzard.com' blog = [] for featured_article in soup.select('#featured-articles article'): image_url = extract_image_url(featured_article.find(class_='Card-image')) text_list = featured_article.select('.text-truncate-ellipsis') category = sanitize_category(text_list[0].text) title = text_list[1].text url = base_url+featured_article.find('a').attrs['href'] blog.append({ 'image': image_url, 'category': category, 'title': title, 'description': '', # featured articles don't have a description 'url': url, }) for recent_article in soup.select('#recent-articles article'): image_url = extract_image_url(recent_article.find(class_='ArticleListItem-image')) category = sanitize_category(recent_article.find(class_='ArticleListItem-subtitle').find(class_='ArticleListItem-labelInner').text) title = recent_article.find(class_='ArticleListItem-title').text description = recent_article.find(class_='ArticleListItem-description').find(class_='h6').text url = base_url+recent_article.find('a').attrs['href'] blog.append({ 'image': image_url, 'category': category, 'title': title, 'description': description, 'url': url }) # reverse order so the oldest article is at [0] # we want to iterate later from oldest to newest blog.reverse() return blog def get_body(post): body = post['title']+"\n" if post['description']: body += post['description']+"\n" body += post['url'] return body def get_formatted_body(post): formatted_body = '
' formatted_body += post['title'] formatted_body += '
' if post['description']: formatted_body += '

'+post['description']+'

' return formatted_body async def main(): # initialize new client config = ClientConfig(store_sync_tokens=True) matrix = AsyncClient(homeserver, config=config) # login login_response = LoginResponse(mxid, 'xxx', accesstoken) await matrix.receive_response(login_response) # filter out everything except m.room.member (for invites) sync_filter = { 'room': { 'state': { 'types': ['m.room.member'], 'lazy_load_members': True }, 'timeline': { 'types': ['invalid'] }, 'ephemeral': { 'types': ['invalid'] } } } # setting this to enforce a scrape at first loop next_update = datetime.now() # use this event type to store our url cache cache_event_type = 'de.lubiland.snowstorm-matrix.cache' while True: # do sync first to e.g. accept an admin room invite sync = await matrix.sync(timeout=30000, sync_filter=sync_filter) print('last sync: '+str(datetime.now())) for room_id in sync.rooms.invite: print('joining: '+room_id) await matrix.join(room_id) if next_update < datetime.now(): # refresh url cache cache_state = await matrix.room_get_state_event(room_id=admin_room, event_type=cache_event_type, state_key=category) if hasattr(cache_state, 'content') and 'url_list' in cache_state.content: cache = cache_state.content['url_list'] else: print('cache is empty') cache = [] # scape all blog posts and process them blog = get_blog() for post in blog: # check if post url is in cache and matches our category if post['url'] not in cache and post['category'] == category: # post url not found in cache # announce new post to matrix rooms print('new post: '+post['url']) content = { 'msgtype': 'm.notice', 'body': get_body(post), 'format': 'org.matrix.custom.html', 'formatted_body': get_formatted_body(post) } for room_id in matrix.rooms: # don't send updates to the admin room if room_id != admin_room: print('to room: '+room_id) await matrix.room_send(room_id=room_id, message_type='m.room.message', content=content) # add url to cache cache += [post['url']] else: # no new posts found pass # trim the cache # len(blog) is usually bigger than the count of posts in our category, # so with len(blog) instead of the latter we have some buffer while len(cache) > len(blog): cache.remove(cache[0]) # set new cache event await matrix.room_put_state(room_id=admin_room, event_type=cache_event_type, state_key=category, content={'url_list': cache}) # wait between 15min and 30min to randomize scraping next_update = datetime.now() + timedelta(minutes=randrange(15, 30)) print('next scrape: '+str(next_update)) homeserver = environ['HOMESERVER'] print('homeserver: '+homeserver) mxid = environ['MXID'] print('homeserver: '+mxid) accesstoken = get_accesstoken_from_file(environ['ACCESSTOKEN_FILE']) print('accesstoken_file: '+environ['ACCESSTOKEN_FILE']) admin_room = environ['ADMIN_ROOM'] print('admin_room: '+admin_room) category = environ['CATEGORY'] print('category: '+category) asyncio.run(main())