319 lines
9.6 KiB
Python
319 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
HushUp - Mastodon Mute Helper
|
|
|
|
This script operates in two modes:
|
|
|
|
1. Registration Mode (--register):
|
|
- Prompts for your Mastodon instance URL (default is https://mastodon.social).
|
|
- Automatically registers the "HushUp" application with the given instance.
|
|
- Using the OAuth flow, displays a URL for you to open and authorize the app.
|
|
- You then paste the resulting authorization code into the script.
|
|
- The script exchanges the code for an access token and saves the client credentials,
|
|
instance URL, access token, and account name to a .env file.
|
|
|
|
2. Normal Mode (no --register flag):
|
|
- Loads credentials (client_id, client_secret, API URL, and access token) from the .env file.
|
|
- Uses the access token to fetch your timeline, display activity statistics,
|
|
and prompt for muting users.
|
|
|
|
An optional flag (--clear) deletes any existing credential files to force a fresh registration.
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from collections import Counter
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from mastodon import Mastodon, MastodonError, MastodonUnauthorizedError
|
|
from prettytable import PrettyTable
|
|
from dotenv import load_dotenv, set_key
|
|
from yaspin import yaspin
|
|
|
|
# Load environment variables from .env (if it exists)
|
|
load_dotenv()
|
|
|
|
# --- Setup Logging ---
|
|
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
|
|
|
|
# --- Configurable Settings (.env or defaults) ---
|
|
MAX_REQUESTS = int(os.getenv('HUSHUP_MAX_REQUESTS', 20)) # Max API calls per session
|
|
ITEMS_PER_REQUEST = int(os.getenv('HUSHUP_ITEMS_PER_REQUEST', 80))# Items per API call
|
|
RATE_LIMIT_DELAY = float(os.getenv('HUSHUP_RATE_LIMIT_DELAY', 0.5)) # Sec between normal requests
|
|
MUTE_DURATION = int(os.getenv('HUSHUP_MUTE_DURATION', 21600)) # Default mute duration (6h)
|
|
MIN_ACTIVITIES = int(os.getenv('HUSHUP_MIN_ACTIVITIES', 10)) # Default activity threshold
|
|
|
|
def clear_credentials():
|
|
"""Delete credential files if they exist."""
|
|
env_file = '.env'
|
|
cred_file = 'clientcred.secret'
|
|
if os.path.exists(env_file):
|
|
os.remove(env_file)
|
|
logging.info(f'Deleted {env_file}.')
|
|
if os.path.exists(cred_file):
|
|
os.remove(cred_file)
|
|
logging.info(f'Deleted {cred_file}.')
|
|
|
|
def register_app(api_base_url: str) -> tuple[str, str]:
|
|
"""Register HushUp application with Mastodon instance."""
|
|
try:
|
|
Mastodon.create_app(
|
|
'HushUp',
|
|
scopes=['read', 'write', 'follow'],
|
|
redirect_uris='urn:ietf:wg:oauth:2.0:oob',
|
|
api_base_url=api_base_url,
|
|
to_file='clientcred.secret'
|
|
)
|
|
logging.info('Application registered successfully as "HushUp".')
|
|
|
|
with open('clientcred.secret', 'r') as f:
|
|
lines = [line.strip() for line in f if line.strip()]
|
|
return lines[0], lines[1]
|
|
|
|
except Exception as e:
|
|
logging.error(f'Error registering application: {e}')
|
|
sys.exit(1)
|
|
|
|
def oauth_flow(api_base_url: str, client_id: str, client_secret: str) -> Mastodon:
|
|
"""Handle complete OAuth authentication flow."""
|
|
try:
|
|
mastodon = Mastodon(
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
api_base_url=api_base_url
|
|
)
|
|
auth_url = mastodon.auth_request_url(
|
|
scopes=['read', 'write', 'follow'],
|
|
redirect_uris='urn:ietf:wg:oauth:2.0:oob'
|
|
)
|
|
print('\n=== Authorization Required ===')
|
|
print('1. Open this URL in your browser:')
|
|
print(f'\n{auth_url}\n')
|
|
print('2. Authorize the application')
|
|
print('3. Paste the authorization code below\n')
|
|
|
|
code = input('Enter authorization code: ').strip()
|
|
|
|
mastodon.log_in(
|
|
code=code,
|
|
redirect_uri='urn:ietf:wg:oauth:2.0:oob',
|
|
scopes=['read', 'write', 'follow']
|
|
)
|
|
return mastodon
|
|
|
|
except MastodonError as e:
|
|
logging.error(f'OAuth failed: {e}')
|
|
sys.exit(1)
|
|
|
|
def fetch_activity_counts(mastodon: Mastodon, hours: int = 48) -> tuple[Counter, Counter]:
|
|
"""Fetch and count timeline activities with rate limit handling."""
|
|
user_toots_counter = Counter()
|
|
user_boosts_counter = Counter()
|
|
|
|
now = datetime.now(timezone.utc)
|
|
time_threshold = now - timedelta(hours=hours)
|
|
last_id = None
|
|
more_items = True
|
|
request_count = 0
|
|
total_processed = 0
|
|
|
|
with yaspin(text='Analyzing timeline activities...', color='cyan') as spinner:
|
|
while more_items and request_count < MAX_REQUESTS:
|
|
try:
|
|
if request_count > 0:
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
|
|
timeline = mastodon.timeline_home(
|
|
max_id=last_id,
|
|
limit=ITEMS_PER_REQUEST
|
|
)
|
|
request_count += 1
|
|
|
|
if not timeline:
|
|
break
|
|
|
|
current_batch = 0
|
|
for toot in timeline:
|
|
toot_time = toot['created_at']
|
|
if toot_time < time_threshold:
|
|
more_items = False
|
|
break
|
|
|
|
username = toot['account']['acct']
|
|
if '@' not in username:
|
|
domain = toot['account']['url'].split('/')[2] if 'url' in toot['account'] else 'unknown'
|
|
username += f'@{domain}'
|
|
|
|
if toot.get('reblog'):
|
|
user_boosts_counter[username] += 1
|
|
else:
|
|
user_toots_counter[username] += 1
|
|
|
|
last_id = toot['id']
|
|
current_batch += 1
|
|
total_processed += 1
|
|
|
|
spinner.text = (f"Processed {total_processed} toots "
|
|
f"(batch {request_count}/{MAX_REQUESTS})")
|
|
|
|
except MastodonError as e:
|
|
if '429' in str(e):
|
|
retry_after = 30
|
|
if hasattr(e, 'response') and 'Retry-After' in e.response.headers:
|
|
retry_after = int(e.response.headers['Retry-After'])
|
|
logging.warning(f"Rate limited. Waiting {retry_after} seconds...")
|
|
time.sleep(retry_after)
|
|
else:
|
|
logging.error(f'Failed to fetch timeline: {e}')
|
|
spinner.fail("✘")
|
|
sys.exit(1)
|
|
|
|
spinner.ok(f"✔ Processed {total_processed} toots total")
|
|
return user_toots_counter, user_boosts_counter
|
|
|
|
def display_activity_table(
|
|
user_toots_counter: Counter,
|
|
user_boosts_counter: Counter,
|
|
min_activities: int = MIN_ACTIVITIES
|
|
) -> tuple[list, dict]:
|
|
"""Display activity table and return filtered results."""
|
|
combined_counts = {
|
|
user: user_toots_counter[user] + user_boosts_counter[user]
|
|
for user in set(user_toots_counter) | set(user_boosts_counter)
|
|
}
|
|
sorted_users = sorted(combined_counts.items(), key=lambda x: x[1], reverse=True)
|
|
filtered_users = [user for user, count in sorted_users if count > min_activities]
|
|
|
|
table = PrettyTable()
|
|
table.field_names = ['User', 'Toots', 'Boosts', 'Total']
|
|
table.align['User'] = 'l'
|
|
for user in filtered_users:
|
|
table.add_row([
|
|
user,
|
|
user_toots_counter[user],
|
|
user_boosts_counter[user],
|
|
combined_counts[user]
|
|
])
|
|
print(table)
|
|
return filtered_users, combined_counts
|
|
|
|
def mute_users(
|
|
mastodon: Mastodon,
|
|
users: list,
|
|
activity_counts: dict,
|
|
mute_duration: int = MUTE_DURATION
|
|
) -> None:
|
|
"""Handle user muting with confirmation prompts."""
|
|
for user in users:
|
|
total = activity_counts[user]
|
|
prompt = (
|
|
f'\nUser {user} has {total} recent activities. '
|
|
f'Mute for {mute_duration//3600}h? (yes/no): '
|
|
)
|
|
response = input(prompt).strip().lower()
|
|
|
|
if response == 'yes':
|
|
try:
|
|
results = mastodon.account_search(user, limit=1)
|
|
if results:
|
|
user_id = results[0]['id']
|
|
mastodon.account_mute(user_id, duration=mute_duration)
|
|
end_time = datetime.now() + timedelta(seconds=mute_duration)
|
|
logging.info(
|
|
f'Muted {user} until {end_time.strftime("%Y-%m-%d %H:%M")}'
|
|
)
|
|
else:
|
|
logging.warning(f'Account not found: {user}')
|
|
except MastodonError as e:
|
|
logging.error(f'Muting failed: {e}')
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
"""Parse command-line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description='HushUp - Mastodon Mute Helper (OAuth Version)'
|
|
)
|
|
parser.add_argument(
|
|
'--register',
|
|
action='store_true',
|
|
help='Perform new OAuth registration'
|
|
)
|
|
parser.add_argument(
|
|
'--clear',
|
|
action='store_true',
|
|
help='Delete all existing credentials'
|
|
)
|
|
parser.add_argument(
|
|
'--min_activities',
|
|
type=int,
|
|
default=MIN_ACTIVITIES,
|
|
help='Minimum activities for muting consideration'
|
|
)
|
|
return parser.parse_args()
|
|
|
|
def main():
|
|
"""Main execution flow with simplified registration."""
|
|
args = parse_args()
|
|
|
|
if args.clear:
|
|
clear_credentials()
|
|
if not args.register:
|
|
return
|
|
|
|
if args.register:
|
|
print('\n=== HushUp Registration ===')
|
|
api_url = input(
|
|
'Enter your Mastodon instance URL [https://mastodon.social]: '
|
|
).strip() or 'https://mastodon.social'
|
|
|
|
client_id, client_secret = register_app(api_url)
|
|
mastodon = oauth_flow(api_url, client_id, client_secret)
|
|
|
|
try:
|
|
account = mastodon.account_verify_credentials()
|
|
username = account['acct']
|
|
|
|
set_key('.env', 'HUSHUP_CLIENT_ID', client_id)
|
|
set_key('.env', 'HUSHUP_CLIENT_SECRET', client_secret)
|
|
set_key('.env', 'HUSHUP_API_URL', api_url)
|
|
set_key('.env', 'HUSHUP_ACCESS_TOKEN', mastodon.access_token)
|
|
set_key('.env', 'HUSHUP_USERNAME', username)
|
|
|
|
print('\nRegistration successful!')
|
|
print(f'Authenticated as @{username}')
|
|
return
|
|
|
|
except MastodonUnauthorizedError:
|
|
logging.error('Authorization failed - please try again')
|
|
sys.exit(1)
|
|
|
|
# Normal operation mode
|
|
try:
|
|
mastodon = Mastodon(
|
|
client_id=os.getenv('HUSHUP_CLIENT_ID'),
|
|
client_secret=os.getenv('HUSHUP_CLIENT_SECRET'),
|
|
access_token=os.getenv('HUSHUP_ACCESS_TOKEN'),
|
|
api_base_url=os.getenv('HUSHUP_API_URL', 'https://mastodon.social')
|
|
)
|
|
account = mastodon.account_verify_credentials()
|
|
logging.info(f'Authenticated as @{account["acct"]}')
|
|
|
|
except MastodonUnauthorizedError:
|
|
logging.error('Invalid/expired credentials. Please re-register with --register')
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logging.error(f'Connection failed: {e}')
|
|
sys.exit(1)
|
|
|
|
try:
|
|
toots, boosts = fetch_activity_counts(mastodon)
|
|
filtered_users, counts = display_activity_table(toots, boosts, args.min_activities)
|
|
mute_users(mastodon, filtered_users, counts)
|
|
except KeyboardInterrupt:
|
|
print('\nOperation cancelled by user')
|
|
sys.exit(0)
|
|
|
|
if __name__ == '__main__':
|
|
main() |