Announcing: A Smart Two-Way Bridge for Counter-Strike 1.6 and Discord!
Ever wanted to see what’s happening on your classic Counter-Strike 1.6 server right from Discord? Or maybe let your Discord community chat with players in-game, even if they’re not at their PC?
I’ve built a new Python bot that does just that, creating a seamless, two-way bridge between your CS 1.6 server and a designated Discord channel. It’s a lightweight, smart solution to keep your community connected.
What It Does: The Key Features
This isn’t just a simple log-dumper. It’s a smart bridge designed to be useful, not noisy.
- Smart Human Detection: This is the best part. The bot stays quiet and suppresses all server messages (like map changes or bot-only kills) when the server is empty. As soon as the first human player joins, the bot wakes up and starts relaying all activity. When the last human leaves, it goes quiet again. No more spamming your Discord channel when no one is playing!
- Two-Way Chat: Messages from Discord are sent directly into the in-game chat. In-game messages (
andsay) are relayed to Discord, complete with team icons (π΅/π΄) and even a tombstone emoji (πͺ¦) if the player is dead.say_team
- Full Game Event Reporting: Get real-time updates for all important events:
- Kills:
Player A killed Player B with an awp π₯
- Joins/Leaves:
π Player C joined the server
- Bomb Events:
π£ The bomb has been planted!
- Round/Win Events:
orπ Round started!π΄ Terrorists Win! (CT 5, T 2)
- Kills:
- GeoIP Player Flags: See where your players are from! The bot automatically looks up player IPs and assigns a country flag (e.g., π¨π¦, πΊπΈ). Bots get a π€ icon, and any failed human lookups get a π³οΈ.
How It Works
The bot is built on two simple principles:
- UDP Log Listener: You configure your CS 1.6 server to send its logs in real-time to the bot (using the
command). The bot listens on a port (default:logaddress_add), parses these logs, and formats them for Discord.8009
- Asynchronous RCON Client: When a user types a message in your Discord channel, the bot uses RCON (the server’s remote-control protocol) to send a
command to the game, broadcasting the message to all players.say
How to Set Up Your Own Bridge
Ready to try it? You’ll need a server (a simple VPS or even a home machine) to run the Python script.
Step 1: Get the Script & Install Dependencies
First, save the Python code above as
You’ll need Python 3 installed. Then, install the required Python libraries using pip:
Bash
pip install discord.py geoip2 tomli
(Note:
Step 2: Get the GeoIP Database
For the country flag feature, you need to download the free GeoLite2 Country database from MaxMind.
- Go to the MaxMind GeoLite2 free database page and sign up.
- Download the “GeoLite2-Country” database (it will be a
file)..mmdb
- Place this file (e.g.,
) on your server in a location the bot can access.GeoLite2-Country.mmdb
Step 3: Create Your
config.toml
Next, create a file named
Here is a template. You must fill in the required values.
Ini, TOML
# --- Required Settings ---
# Your Discord Bot Token (get from Discord Developer Portal)
discord_token = "YOUR_BOT_TOKEN_HERE"
# The ID of the Discord channel you want to bridge
discord_channel_id = "YOUR_CHANNEL_ID_HERE"
# Your Counter-Strike 1.6 server's IP or hostname
cs_host = "12.34.56.78"
# Your Counter-Strike 1.6 server's RCON port (usually the same as game port)
cs_port = 27015
# Your server's RCON password (from server.cfg)
cs_rcon_password = "YOUR_RCON_PASSWORD_HERE"
# --- Optional Settings ---
# Path to the GeoIP database you downloaded in Step 2
geoip_db = "/path/to/GeoLite2-Country.mmdb"
# The IP address for the bot to listen on.
# "0.0.0.0" is usually correct to listen on all available IPs.
listen_host = "0.0.0.0"
# The port for the bot to listen for CS logs on.
# This MUST match what you set in Step 4.
listen_port = 8009
# Set to 'true' to allow bot-on-bot kills and other bot-only
# activity to be posted even when no humans are on.
allow_bot_messages = false
# The prefix for bot commands in Discord (e.g., !players)
# The bridge will ignore messages starting with this.
discord_prefix = "!"
- To get a Bot Token: You need to create an “Application” in the Discord Developer Portal. Create a Bot, and be sure to enable the Message Content Intent under the “Bot” tab.
- To get a Channel ID: In Discord, enable Developer Mode (Settings > Advanced), then right-click your channel and select “Copy ID”.
Step 4: Configure Your CS 1.6 Server
Now, tell your Counter-Strike server to send its logs to your bot.
Add the following line to your server’s
log on
logaddress_add BOT_SERVER_IP:8009
You will need to restart your server (or change maps) for this to take effect.
Step 5: Run the Bot!
You’re all set. Go to the directory with your script and config file and run:
Bash
python3 cs_discord_bridge.py
If all goes well, you’ll see console messages indicating it has logged into Discord and is listening for logs. Now, when you join your server, you should see the activity pop up in your Discord channel!
That’s it! You now have a smart, modern bridge to your classic CS 1.6 server. Feel free to grab the code and try it out.
#!/usr/bin/env python3
"""
cs_discord_bridge.py β Two-way Discord β Counter-Strike 1.6 bridge
Features:
- Chat relay (say, say_team) with tombstone for dead players
- Join / leave (posts leave messages always)
- Kill events (killer β victim, with flags, headshots)
- Suicide / world kills
- Bomb events (spawned, dropped, got, planted, defused, bombed) with π£ emoji
- Discord β CS chat via asynchronous RCON client
- Consistent GeoIP flags (π€ for bots, π³οΈ for failed human lookups)
- Enhanced debugging for shell command execution
- Round start/end and team win messages with scores
- Map change messages suppressed until human detected, re-suppressed when all humans leave
- Messages sent when human player (non-BOT SteamID) detected in logs
- Message suppression for non-disconnect events when no humans
Listen port set to 8008 for CS logs.
"""
import asyncio
import datetime
import os
import re
from typing import Optional
from io import BytesIO
try:
import tomllib # py311+
except ModuleNotFoundError:
import tomli as tomllib
import discord
from discord.ext import commands
import geoip2.database
def cc_to_flag(cc: str) -> str:
if not cc or len(cc) != 2:
return "π³οΈ" # White flag for failed GeoIP lookups for humans
base = 127397
return chr(ord(cc[0].upper()) + base) + chr(ord(cc[1].upper()) + base)
# --- Asynchronous RCON Client Class (UDP Version) ---
class RconClient:
def __init__(self, host, port, password):
self.host = host
self.port = port
self.password = password
self.packet_size = 1024
async def get_challenge(self):
loop = asyncio.get_running_loop()
on_done = loop.create_future()
message = b'\xFF\xFF\xFF\xFFgetchallenge\n'
class ChallengeProtocol(asyncio.DatagramProtocol):
def __init__(self):
self.transport = None
self.future = on_done
def connection_made(self, transport):
self.transport = transport
self.transport.sendto(message)
def datagram_received(self, data, addr):
try:
challenge = data[5:].decode().split(' ')[1].strip()
self.future.set_result(challenge)
except (IndexError, UnicodeDecodeError) as e:
self.future.set_exception(ValueError(f"Invalid challenge response: {e}"))
finally:
self.transport.close()
def error_received(self, exc):
self.future.set_exception(exc)
self.transport.close()
try:
transport, protocol = await loop.create_datagram_endpoint(
ChallengeProtocol,
remote_addr=(self.host, self.port)
)
return await asyncio.wait_for(on_done, timeout=5.0)
except asyncio.TimeoutError:
print(f"[{now_iso()}] RCON challenge request timed out.")
except Exception as e:
print(f"[{now_iso()}] [ERROR] Getting RCON challenge: {e}")
return None
async def send_command(self, command: str):
challenge = await self.get_challenge()
if not challenge:
print(f"[{now_iso()}] Cannot send command, failed to get RCON challenge.")
return
loop = asyncio.get_running_loop()
on_done = loop.create_future()
message_buffer = BytesIO()
message_buffer.write(b'\xFF\xFF\xFF\xFF')
message_buffer.write('rcon '.encode())
message_buffer.write(challenge.encode())
message_buffer.write(b' ')
message_buffer.write(self.password.encode())
message_buffer.write(b' ') # New line
message_buffer.write(command.encode()) # New line
message_buffer.write(b'\n')
message_bytes = message_buffer.getvalue()
class CommandProtocol(asyncio.DatagramProtocol):
def __init__(self):
self.transport = None
self.future = on_done
def connection_made(self, transport):
self.transport = transport
self.transport.sendto(message_bytes)
self.future.set_result(None)
self.transport.close()
def error_received(self, exc):
self.future.set_exception(exc)
self.transport.close()
try:
transport, protocol = await loop.create_datagram_endpoint(
CommandProtocol,
remote_addr=(self.host, self.port)
)
await asyncio.wait_for(on_done, timeout=5.0)
print(f"[{now_iso()}] Sent RCON command: {command}")
except asyncio.TimeoutError:
print(f"[{now_iso()}] RCON command timed out.")
except Exception as e:
print(f"[{now_iso()}] [ERROR] Sending RCON command: {e}")
# --- Regexes ---
HL_SAY_REGEXES = [
re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" say "(.*)"(?: \(dead\))?$'),
re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" say_team "(.*)"(?: \(dead\))?$'),
]
CONNECT_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" connected, address "(.+?):(\d+)"$')
JOIN_LEAVE_REGEXES = [
re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" entered the game$'),
re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" disconnected(?: \(reason "(.*?)"\))?$'),
re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" joined team "(.*)"$'),
]
KILL_REGEX = re.compile(
r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" killed "(.+?)<\d+><(.*?)><(.*?)>" with "(.*?)"(?: \(headshot\))?$'
)
SUICIDE_REGEXES = [
re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" committed suicide with "(.*?)"$'),
re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" killed self with "(.*?)"$'),
]
BOMB_REGEX = re.compile(
r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" triggered "(Spawned_With_The_Bomb|Dropped_The_The_Bomb|Got_The_Bomb|Planted_The_Bomb)"'
)
TEAM_BOMB_REGEX = re.compile(
r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Team "(CT|TERRORIST)" triggered "(Bomb_Defused|Target_Bombed)" \(CT "(\d+)"\) \(T "(\d+)"\)'
)
STEAM_VALIDATE_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" STEAM USERID validated$')
RCON_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Rcon: "rcon (.+)" from "(.+)"$')
MAPCHANGE_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Started map "(.+)" \(CRC ".+"')
STARTED_MAP_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Started map "(.+?)"')
ROUND_START_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: World triggered "Round_Start"$')
ROUND_END_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: World triggered "Round_End"$')
TEAM_WIN_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Team "(CT|TERRORIST)" triggered "(CTs_Win|Terrorists_Win)" \(CT "(\d+)"\) \(T "(\d+)"\)$')
def now_iso():
return datetime.datetime.now().astimezone().isoformat(timespec="seconds")
class HLLogUDP(asyncio.DatagramProtocol):
def __init__(self, on_line):
super().__init__()
self.on_line = on_line
def datagram_received(self, data: bytes, addr):
try:
text = data.decode('utf-8', errors='replace').strip()
except Exception:
text = repr(data)
for line in text.splitlines():
asyncio.create_task(self.on_line(line, addr))
class CSDiscordBridge:
def __init__(self, cfg: dict):
self.cfg = cfg
intents = discord.Intents.default()
intents.message_content = True
intents.messages = True # Ensure message intents are enabled
self.bot = commands.Bot(command_prefix=cfg.get("discord_prefix", "!"), intents=intents)
self.channel: Optional[discord.TextChannel] = None
self.geoip_reader = None
db_path = self.cfg.get("geoip_db", "/home/csserver/discord/GeoLite2-Country.mmdb")
try:
self.geoip_reader = geoip2.database.Reader(db_path)
except Exception as e:
print(f"[{now_iso()}] [WARN] GeoIP not available: {e}")
self.player_flags = {}
self.connected_players = {}
self.has_human_player = False
self.allow_bot_messages = cfg.get("allow_bot_messages", False)
self.pending_map_change = None
self.suppress_join_messages = False
self.bot.event(self.on_ready)
self.bot.event(self.on_message)
self.rcon_client = RconClient(
self.cfg["cs_host"],
int(self.cfg["cs_port"]),
self.cfg["cs_rcon_password"]
)
async def on_ready(self):
ch_id = int(self.cfg["discord_channel_id"])
try:
self.channel = self.bot.get_channel(ch_id) or await self.bot.fetch_channel(ch_id)
print(f"[{now_iso()}] Logged in as {self.bot.user}. Bridging to #{self.channel.name} ({self.channel.id})")
except Exception as e:
print(f"[{now_iso()}] [ERROR] Failed to fetch Discord channel {ch_id}: {e}")
return
await self._send_startup_commands()
listen_host = self.cfg.get("listen_host", "0.0.0.0")
listen_port = int(self.cfg.get("listen_port", 8008))
loop = asyncio.get_running_loop()
print(f"[{now_iso()}] Listening for HL logs on {listen_host}:{listen_port}")
try:
await loop.create_datagram_endpoint(
lambda: HLLogUDP(self.handle_hl_line),
local_addr=(listen_host, listen_port)
)
except Exception as e:
print(f"[{now_iso()}] [ERROR] Failed to start UDP listener: {e}")
async def _send_startup_commands(self):
print(f"[{now_iso()}] Sending RCON command to disable chat prefixes...")
await self.rcon_client.send_command('amx_chat_prefix ""')
await self.rcon_client.send_command('cs_chat_prefix ""')
await self.rcon_client.send_command('sv_say_prefix ""')
async def on_message(self, message: discord.Message):
print(f"[{now_iso()}] [DEBUG] Received Discord message: author={message.author}, channel={message.channel.id}, content={message.content}")
if message.author.bot:
print(f"[{now_iso()}] [DEBUG] Ignoring message from bot: {message.author}")
return
if not self.channel or message.channel.id != self.channel.id:
print(f"[{now_iso()}] [DEBUG] Message from wrong channel: {message.channel.id}, expected {self.channel.id if self.channel else 'None'}")
return
prefix = self.cfg.get("discord_prefix", "!")
if message.content.startswith(prefix):
print(f"[{now_iso()}] [DEBUG] Ignoring command with prefix: {message.content}")
return
content = self._sanitize_discord(message.content)
author = message.author.display_name
line = f'say (DISCORD) {author}: {content}'
print(f"[{now_iso()}] [DEBUG] Processing Discord message: {line} (has_human_player={self.has_human_player})")
await self._send_rcon_say(line)
async def _send_rcon_say(self, command: str):
limit = self.cfg.get("say_length_limit", 190)
if len(command) > limit:
command = command[:limit-1] + "β¦"
print(f"[{now_iso()}] [DEBUG] Sending RCON command to CS: {command}")
await self.rcon_client.send_command(command)
async def handle_hl_line(self, line: str, addr):
if "Server cvar" in line:
return
print(f"[{now_iso()}] Received log: {line}")
# Strip potential garbage prefixes like οΏ½οΏ½οΏ½οΏ½log
cleaned_line = line
if line.startswith('\ufffd\ufffd\ufffd\ufffdlog'):
cleaned_line = line[8:].strip()
print(f"[{now_iso()}] Stripped garbage prefix, cleaned log: {cleaned_line}")
log_start_match = re.search(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}:', cleaned_line)
if not log_start_match:
print(f"[{now_iso()}] Unmatched log (no valid start): {cleaned_line}")
if "connected, address" in cleaned_line and self._parse_connected(cleaned_line):
return
return
cleaned_line = cleaned_line[log_start_match.start():].strip()
print(f"[{now_iso()}] Cleaned log: {cleaned_line}")
map_change = self._parse_mapchange(cleaned_line)
if map_change:
print(f"[{now_iso()}] Detected map change: {map_change}. Storing for later.")
self.pending_map_change = map_change
self.suppress_join_messages = True
await self._post_to_discord(map_change, force=True)
return
if self._parse_steam_validated(cleaned_line):
return
if self._parse_rcon(cleaned_line):
return
if self._parse_connected(cleaned_line):
return
joinleave = self._parse_join_leave(cleaned_line)
if joinleave:
print(f"[{now_iso()}] Parsed join/leave: {joinleave} (has_human_player={self.has_human_player})")
await self._post_to_discord(joinleave)
return
say, team_flag, payload = self._parse_say(cleaned_line)
if say:
print(f"[{now_iso()}] Parsed say (team={team_flag}): {payload} (has_human_player={self.has_human_player})")
await self._post_to_discord(payload)
return
kill = self._parse_kill(cleaned_line)
if kill:
print(f"[{now_iso()}] Parsed kill: {kill} (has_human_player={self.has_human_player})")
await self._post_to_discord(kill)
return
suicide = self._parse_suicide(cleaned_line)
if suicide:
print(f"[{now_iso()}] Parsed suicide: {suicide} (has_human_player={self.has_human_player})")
await self._post_to_discord(suicide)
return
bomb = self._parse_bomb(cleaned_line)
if bomb:
print(f"[{now_iso()}] Parsed bomb event: {bomb} (has_human_player={self.has_human_player})")
await self._post_to_discord(bomb)
return
if ROUND_START_REGEX.search(cleaned_line):
payload = "π Round started!"
print(f"[{now_iso()}] Parsed round start: {payload} (has_human_player={self.has_human_player})")
self.suppress_join_messages = False
await self._post_to_discord(payload)
return
if ROUND_END_REGEX.search(cleaned_line):
payload = "π Round ended!"
print(f"[{now_iso()}] Parsed round end: {payload} (has_human_player={self.has_human_player})")
await self._post_to_discord(payload)
return
m = TEAM_WIN_REGEX.search(cleaned_line)
if m:
team, _, ct_score, t_score = m.groups()
if team == "CT":
payload = f"π΅ **Counter-Terrorists Win!** (CT {ct_score}, T {t_score})"
else:
payload = f"π΄ **Terrorists Win!** (CT {ct_score}, T {t_score})"
print(f"[{now_iso()}] Parsed team win: {payload} (has_human_player={self.has_human_player})")
await self._post_to_discord(payload)
return
print(f"[{now_iso()}] Unmatched log: {cleaned_line} (has_human_player={self.has_human_player})")
def _parse_mapchange(self, line: str) -> Optional[str]:
m_mapchange = re.compile(
r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: -------- Mapchange to (.+?) --------$'
).search(line)
m_started_map = re.compile(
r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Started map "(.+?)"'
).search(line)
map_name = None
if m_mapchange:
map_name = m_mapchange.group(1)
elif m_started_map:
map_name = m_started_map.group(1)
if map_name:
# snapshot current state before reset
had_humans = self.has_human_player
# reset for the new map
self.has_human_player = False
self.connected_players = {}
self.suppress_join_messages = True
# Only announce if humans were present or bot messages are allowed
if had_humans or self.allow_bot_messages:
return f"π Map changed to `{map_name}`"
else:
print(f"[{now_iso()}] Suppressing mapchange announcement ({map_name}) β no human players were online.")
return None
return None
def _parse_steam_validated(self, line: str) -> bool:
m = STEAM_VALIDATE_REGEX.search(line)
if m:
name, steamid, team = m.groups()
print(f"[{now_iso()}] Handling 'STEAM USERID validated' for {name} ({steamid}).")
if steamid != "BOT":
if "STEAM_ID_PENDING" in self.connected_players:
pending_name = self.connected_players.pop("STEAM_ID_PENDING")
self.connected_players[steamid] = pending_name
if "STEAM_ID_PENDING" in self.player_flags:
self.player_flags[steamid] = self.player_flags.pop("STEAM_ID_PENDING")
print(f"[{now_iso()}] Transferred flag for {pending_name} from STEAM_ID_PENDING to {steamid}: {self.player_flags[steamid]}")
return True
return False
def _parse_rcon(self, line: str) -> bool:
m = RCON_REGEX.search(line)
if m:
command, _ = m.groups()
if not command.strip(): # Ignore incomplete RCON commands
print(f"[{now_iso()}] [DEBUG] Ignored incomplete RCON command: {line}")
return True
print(f"[{now_iso()}] Handled RCON command log: {command}")
return True
return False
def _parse_connected(self, line: str) -> bool:
m = CONNECT_REGEX.search(line)
if m:
name, steamid, team, ip, port = m.groups()
print(f"[{now_iso()}] [DEBUG] Matched CONNECT_REGEX for: {name} ({steamid}, IP {ip})")
if steamid == "BOT":
print(f"[{now_iso()}] Bot connected: {name} ({steamid})")
self.player_flags[steamid] = "π€"
print(f"[{now_iso()}] Assigned flag code for {name} ({steamid}): 'None' -> 'π€'")
return True
self.connected_players[steamid] = name
self.has_human_player = True
flag = "π³οΈ"
country_code = None
if self.geoip_reader:
try:
response = self.geoip_reader.country(ip)
country_code = response.country.iso_code
if country_code:
flag = cc_to_flag(country_code)
print(f"[{now_iso()}] GeoIP lookup for {name} ({steamid}, IP {ip}): country code '{country_code}' -> flag '{flag}'")
else:
print(f"[{now_iso()}] [WARN] No country code found for IP {ip}")
except Exception as e:
print(f"[{now_iso()}] [ERROR] GeoIP lookup failed for IP {ip}: {e}")
else:
print(f"[{now_iso()}] [ERROR] GeoIP reader not initialized for {name} ({steamid})")
self.player_flags[steamid] = flag
print(f"[{now_iso()}] Assigned flag code for {name} ({steamid}): '{country_code or 'None'}' -> '{flag}'")
print(f"[{now_iso()}] [DEBUG] GeoIP database path: {self.cfg.get('geoip_db', '/home/csserver/discord/GeoLite2-Country.mmdb')}")
print(f"[{now_iso()}] [DEBUG] Player flags dictionary: {self.player_flags}")
return True
else:
print(f"[{now_iso()}] [DEBUG] CONNECT_REGEX failed to match: {line}")
return False
def _parse_join_leave(self, line: str) -> Optional[str]:
# Ignore raw connection lines like: " from "64.188.91.127:53453"
if ' from "' in line:
print(f"[{now_iso()}] Ignoring connection info line: {line.strip()}")
return None
entered_the_game_regex = re.compile(
r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)?>" entered the game$'
)
m = entered_the_game_regex.search(line)
if m:
name, steamid, team = m.groups()[:3]
if steamid == "BOT":
self.player_flags[steamid] = "π€"
print(f"[{now_iso()}] Stored flag for {name} ({steamid}): π€")
print(f"[{now_iso()}] Ignoring bot join message for {name}.")
return None
entered_the_game_regex = re.compile(
r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)?>" entered the game$'
)
m = entered_the_game_regex.search(line)
if m:
name, steamid, team = m.groups()[:3]
if steamid == "BOT":
self.player_flags[steamid] = "π€"
print(f"[{now_iso()}] Stored flag for {name} ({steamid}): π€")
print(f"[{now_iso()}] Ignoring bot join message for {name}.")
return None
if self.suppress_join_messages:
print(f"[{now_iso()}] Suppressing join message for {name} ({steamid}) during map change.")
self.connected_players[steamid] = name
self.has_human_player = True
return None
self.connected_players[steamid] = name
self.has_human_player = True
name = self._sanitize_name(name)
flag = self.player_flags.get(steamid, "π³οΈ")
if flag == "π³οΈ" and steamid != "BOT":
print(f"[{now_iso()}] [WARN] No flag found for {name} ({steamid}), attempting GeoIP lookup")
m_connect = CONNECT_REGEX.search(line)
if m_connect:
_, _, _, ip, _ = m_connect.groups()
if self.geoip_reader and ip:
try:
response = self.geoip_reader.country(ip)
country_code = response.country.iso_code
if country_code:
flag = cc_to_flag(country_code)
self.player_flags[steamid] = flag
print(f"[{now_iso()}] Fallback GeoIP lookup for {name} ({steamid}, IP {ip}): country code '{country_code}' -> flag '{flag}'")
else:
print(f"[{now_iso()}] [WARN] No country code found for IP {ip} in fallback lookup")
except Exception as e:
print(f"[{now_iso()}] [ERROR] Fallback GeoIP lookup failed for IP {ip}: {e}")
print(f"[{now_iso()}] Human player joined: {name} ({steamid}), flag={flag}, has_human_player={self.has_human_player}")
return f'π {flag} `{name}` joined the server'
for rx in JOIN_LEAVE_REGEXES:
m = rx.search(line)
if m:
if "disconnected" in line:
name, steamid, _, reason = m.groups()
if steamid == "BOT":
print(f"[{now_iso()}] Ignoring bot leave message for {name}.")
return None
if self.suppress_join_messages:
print(f"[{now_iso()}] Suppressing leave message for {name} ({steamid}) during map change window")
if steamid in self.connected_players:
del self.connected_players[steamid]
return None
name = self._sanitize_name(name)
flag = self.player_flags.get(steamid, "π³οΈ")
leave_message = f'β {flag} `{name}` left the server' + (f' ({reason})' if reason else '')
if steamid in self.connected_players:
del self.connected_players[steamid]
if not any(sid != "BOT" for sid in self.connected_players):
self.has_human_player = False
print(f"[{now_iso()}] No human players remain, has_human_player={self.has_human_player}")
print(f"[{now_iso()}] Human player left: {name} ({steamid}), flag={flag}, has_human_player={self.has_human_player}")
return leave_message
elif "joined team" in line:
name, steamid, team, _ = m.groups()
if steamid == "BOT":
self.player_flags[steamid] = "π€"
print(f"[{now_iso()}] Stored flag for {name} ({steamid}): π€")
print(f"[{now_iso()}] Ignoring bot joined team message.")
return None
self.has_human_player = True
print(f"[{now_iso()}] Human detected in joined team: {name} ({steamid}), has_human_player={self.has_human_player}")
return None
return None
def _parse_say(self, line: str):
for rx in HL_SAY_REGEXES:
m = rx.search(line)
if m:
name, steamid, team, msg = m.groups()
team_flag = "say_team" in rx.pattern or "say_team" in line
name = self._sanitize_name(name)
msg = self._sanitize_cs(msg)
flag = self.player_flags.get(steamid, "π€" if steamid == "BOT" else "π³οΈ")
if steamid != "BOT":
self.has_human_player = True
print(f"[{now_iso()}] Human detected in say: {name} ({steamid}), has_human_player={self.has_human_player}")
team_icon = "π΅" if team.upper().startswith("CT") else "π΄" if team.upper().startswith("T") else "βͺ"
dead_indicator = " πͺ¦" if "(dead)" in line else ""
payload = f'π¬ {flag} {team_icon} `{name}`{dead_indicator}: {msg}'
return True, team_flag, payload
return False, False, None
def _parse_kill(self, line: str) -> Optional[str]:
m = KILL_REGEX.search(line)
if not m:
return None
killer_name, killer_id, killer_team, victim_name, victim_id, victim_team, weapon = m.groups()
if killer_id == "BOT" and victim_id == "BOT":
print(f"[{now_iso()}] Ignoring kill message (bot killed bot).")
return None
if killer_id != "BOT" or victim_id != "BOT":
self.has_human_player = True
print(f"[{now_iso()}] Human detected in kill: {killer_name} ({killer_id}) or {victim_name} ({victim_id}), has_human_player={self.has_human_player}")
killer_flag = self.player_flags.get(killer_id, "π€" if killer_id == "BOT" else "π³οΈ")
victim_flag = self.player_flags.get(victim_id, "π€" if victim_id == "BOT" else "π³οΈ")
killer_name = self._sanitize_name(killer_name)
victim_name = self._sanitize_name(victim_name)
headshot = " π₯" if "(headshot)" in line else ""
killer_team_icon = "π΅" if killer_team.upper().startswith("CT") else "π΄" if killer_team.upper().startswith("T") else "βͺ"
victim_team_icon = "π΅" if victim_team.upper().startswith("CT") else "π΄" if victim_team.upper().startswith("T") else "βͺ"
# Determine article
vowel_sounds = ('a', 'e', 'i', 'o', 'u')
exceptions = {"tmp", "usp", "ump", "uzi"} # extend if needed
if weapon.lower() in exceptions:
article = "a"
else:
article = "an" if weapon.lower().startswith(vowel_sounds) else "a"
return f'{killer_flag} {killer_team_icon} `{killer_name}` killed {victim_flag} {victim_team_icon} `{victim_name}` with {article} *{weapon}*{headshot}'
def _parse_suicide(self, line: str) -> Optional[str]:
for rx in SUICIDE_REGEXES:
m = rx.search(line)
if m:
name, steamid, _, weapon = m.groups()
if steamid != "BOT":
self.has_human_player = True
print(f"[{now_iso()}] Human detected in suicide: {name} ({steamid}), has_human_player={self.has_human_player}")
name = self._sanitize_name(name)
flag = self.player_flags.get(steamid, "π€" if steamid == "BOT" else "π³οΈ")
return f'π {flag} `{name}` died ({weapon})'
return None
def _parse_bomb(self, line: str) -> Optional[str]:
m = BOMB_REGEX.search(line)
if m:
name, steamid, _, event = m.groups()
name = self._sanitize_name(name)
flag = self.player_flags.get(steamid, "π€" if steamid == "BOT" else "π³οΈ")
if steamid != "BOT":
self.has_human_player = True
print(f"[{now_iso()}] Human detected in bomb event: {name} ({steamid}), has_human_player={self.has_human_player}")
if event == "Planted_The_Bomb":
return 'π£ *The bomb has been planted!*'
m = TEAM_BOMB_REGEX.search(line)
if m:
team, event, ct_score, t_score = m.groups()
if event == "Bomb_Defused":
return 'π£ *The bomb has been defused!*'
elif event == "Target_Bombed":
return 'π£ *Target successfully bombed!*'
return None
async def _post_to_discord(self, content: str, force=False):
if not self.channel:
print(f"[{now_iso()}] [ERROR] Discord channel not set, cannot post: {content}")
return
# Always allow join/leave messages through (mapchange suppression already handled in _parse_join_leave)
if force or content.startswith(("π", "β")) or self.has_human_player or self.allow_bot_messages:
try:
await self.channel.send(content)
print(f"[{now_iso()}] Successfully posted to Discord: {content} (has_human_player={self.has_human_player})")
except Exception as e:
print(f"[{now_iso()}] [ERROR] Failed to send to Discord: {e} (content={content})")
else:
print(f"[{now_iso()}] Suppressing Discord message due to no human players: {content} (has_human_player={self.has_human_player})")
def _sanitize_discord(self, s: str) -> str:
s = s.replace('"', '').strip()
return ''.join(ch for ch in s if 31 < ord(ch) < 127)
def _sanitize_cs(self, s: str) -> str:
s = s.replace('"', "'")
return ''.join(ch for ch in s if 31 < ord(ch) < 127)
def _sanitize_name(self, s: str) -> str:
s = s.strip()
if len(s) > 24:
s = s[:23] + "β¦"
return self._sanitize_cs(s)
async def run(self):
listen_host = self.cfg.get("listen_host", "0.0.0.0")
listen_port = int(self.cfg.get("listen_port", 8008))
loop = asyncio.get_running_loop()
print(f"[{now_iso()}] Listening for HL logs on {listen_host}:{listen_port}")
try:
await loop.create_datagram_endpoint(
lambda: HLLogUDP(self.handle_hl_line),
local_addr=(listen_host, listen_port)
)
except Exception as e:
print(f"[{now_iso()}] [ERROR] Failed to start UDP listener: {e}")
await self.bot.start(self.cfg["discord_token"])
def load_config(path: str) -> dict:
try:
with open(path, "rb") as f:
cfg = tomllib.load(f)
except Exception as e:
print(f"[{now_iso()}] [ERROR] Failed to load config {path}: {e}")
raise SystemExit(f"Failed to load config: {e}")
required = ["discord_token", "discord_channel_id", "cs_host", "cs_port", "cs_rcon_password"]
for key in required:
if key not in cfg:
print(f"[{now_iso()}] [ERROR] Missing required config key: {key}")
raise SystemExit(f"Missing required config key: {key}")
return cfg
async def amain(cfg_path: str):
cfg = load_config(cfg_path)
bridge = CSDiscordBridge(cfg)
await bridge.run()
def main():
cfg_path = os.environ.get("CSBRIDGE_CONFIG", "config.toml")
try:
asyncio.run(amain(cfg_path))
except KeyboardInterrupt:
print(f"[{now_iso()}] Exiting...")
except Exception as e:
print(f"[{now_iso()}] [ERROR] Fatal error: {e}")
if __name__ == "__main__":
main()