You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
snowstorm-matrix/scrape.py

206 lines
7.9 KiB
Python

from os import environ
import requests
import re
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import asyncio
from nio import ClientConfig, AsyncClient, LoginResponse, InviteEvent
def get_accesstoken_from_file(accesstoken_path):
accesstoken_file = open(accesstoken_path, 'r')
single_accesstoken = accesstoken_file.read().strip()
accesstoken_file.close()
return single_accesstoken
async def on_event(room, event):
if hasattr(event, 'membership'):
if event.membership == 'invite':
# automatically join invites
print('joining '+room.room_id)
join = await matrix[event.source['state_key']].join(room.room_id)
print(join)
def get_blog():
url = 'https://news.blizzard.com/en-us/'
html = requests.get(url).text
soup = BeautifulSoup(html, 'html.parser')
base_url = 'https://news.blizzard.com'
blog = []
feature_list_html = soup.find_all(class_='FeaturedArticle-link')
for feature_html in feature_list_html:
image_html = feature_html.find(class_='Card-image')
image_url_fragment = re.findall('url\("(.*?)"\)', image_html.attrs['style'])[0]
image_url = 'https:'+image_url_fragment
text_list = feature_html.find_all(class_='text-truncate-ellipsis')
blog.append({
'image': image_url,
'game': text_list[0].contents[0].replace(' ', '').lower(),
'title': text_list[1].contents[0],
'description': '',
'url': base_url+feature_html.attrs['href'],
})
article_list_html = soup.find_all(class_='ArticleListItem')
for article_html in article_list_html:
image_html = article_html.find(class_='ArticleListItem-image')
image_url_fragment = re.findall('url\((.*?)\)', image_html.attrs['style'])[0]
image_url = 'https:'+image_url_fragment
content_html = article_html.find(class_='ArticleListItem-contentGrid')
blog.append({
'image': image_url,
'game': content_html.find(class_='ArticleListItem-subtitle').find(class_='ArticleListItem-labelInner').contents[0].replace(' ', '').lower(),
'title': content_html.find(class_='ArticleListItem-title').contents[0],
'description': content_html.find(class_='ArticleListItem-description').find(class_='h6').contents[0],
'url': base_url+article_html.find(class_='ArticleLink').attrs['href'],
})
return blog
def get_body(post):
body = post['title']+"\n"
if post['description']:
body += post['description']+"\n"
body += post['url']
return body
def get_formatted_body(post):
formatted_body = '<h5><a href="'+post['url']+'">'
formatted_body += post['title']
formatted_body += '</a></h5>'
if post['description']:
formatted_body += '<p>'+post['description']+'</p>'
return formatted_body
async def main():
event_type_prefix = 'de.lubiland.snowstorm-matrix.'
next_batch = {}
for game in device:
# initialize new client
mxid = '@'+mxid_prefix+game+':'+homeserver_name
config = ClientConfig(store_sync_tokens=True)
matrix[mxid] = AsyncClient(homeserver_url,
config=config)
# login
login_response = LoginResponse(mxid,
device[game]['id'],
device[game]['accesstoken'])
await matrix[mxid].receive_response(login_response)
matrix[mxid].add_event_callback(on_event, InviteEvent)
# do a first sync
sync_filter = {
'room': {
'state': {
'types': ['m.room.member'],
'lazy_load_members': True
},
'timeline': {
'types': ['invalid']
},
'ephemeral': {
'types': ['invalid']
}
}
}
next_batch_state = await matrix[mxid].room_get_state_event(room_id=admin_room,
event_type=event_type_prefix+'next_batch',
state_key=mxid)
if 'token' in next_batch_state.content:
try:
sync = await matrix[mxid].sync(timeout=3000,
sync_filter=sync_filter,
since=next_batch_state.content['token'])
next_batch[mxid] = sync.next_batch
continue
except:
pass
# when there is no next_batch token or first sync threw an error,
# then do a first sync without next_batch
print('doing first sync for '+mxid)
sync = await matrix[mxid].sync(timeout=3000,
sync_filter=sync_filter)
next_batch[mxid] = sync.next_batch
next_update = datetime.now()
while True:
if next_update < datetime.now():
blog = get_blog()
for post in blog:
mxid = '@'+mxid_prefix+post['game']+':'+homeserver_name
if mxid in matrix:
content = {
'msgtype': 'm.notice',
'body': get_body(post),
'format': 'org.matrix.custom.html',
'formatted_body': get_formatted_body(post)
}
for room_id in matrix[mxid].rooms:
await matrix[mxid].room_send(room_id=room_id,
message_type='m.room.message',
content=content)
else:
content = {
'msgtype': 'm.notice',
'body': 'No accesstoken for '+mxid+' available.',
'format': 'org.matrix.custom.html',
'formatted_body': ('No <code>accesstoken</code> for '+
'<code>'+mxid+'</code> available.')
}
# send the message with the first available matrix client,
# because we will always have at least one accesstoken
await matrix[next(iter(matrix))].room_send(room_id=admin_room,
message_type='m.room.message',
content=content)
next_update = datetime.now() + timedelta(minutes=30)
next_batch_state = await matrix[mxid].room_get_state_event(room_id=admin_room,
event_type=event_type_prefix+'next_batch',
state_key=mxid)
for mxid in next_batch:
sync = await matrix[mxid].sync(timeout=10000,
sync_filter=sync_filter,
since=next_batch[mxid])
next_batch[mxid] = sync.next_batch
await matrix[mxid].room_put_state(room_id=admin_room,
event_type=event_type_prefix+'next_batch',
state_key=mxid,
content={'token': next_batch[mxid]})
homeserver_name = environ['HOMESERVER_NAME']
homeserver_url = environ['HOMESERVER_URL']
mxid_prefix = environ['MXID_PREFIX']
admin_room = environ['ADMIN_ROOM']
device = {}
for var in environ:
if (game := re.match('^DEVICEID_([A-Z]*)$', var)) is not None:
device[game[1].lower()] = {'id': environ[var]}
for var in environ:
if (game := re.match('^ACCESSTOKEN_([A-Z]*)_FILE$', var)) is not None:
device[game[1].lower()]['accesstoken'] = get_accesstoken_from_file(environ[var])
matrix = {}
asyncio.run(main())