Counter-Strike 1.6 Discord Bot

Announcing: A Smart Two-Way Bridge for Counter-Strike 1.6 and Discord!

Ever wanted to see what’s happening on your classic Counter-Strike 1.6 server right from Discord? Or maybe let your Discord community chat with players in-game, even if they’re not at their PC?

I’ve built a new Python bot that does just that, creating a seamless, two-way bridge between your CS 1.6 server and a designated Discord channel. It’s a lightweight, smart solution to keep your community connected.

What It Does: The Key Features

This isn’t just a simple log-dumper. It’s a smart bridge designed to be useful, not noisy.

  • Smart Human Detection: This is the best part. The bot stays quiet and suppresses all server messages (like map changes or bot-only kills) when the server is empty. As soon as the first human player joins, the bot wakes up and starts relaying all activity. When the last human leaves, it goes quiet again. No more spamming your Discord channel when no one is playing!
  • Two-Way Chat: Messages from Discord are sent directly into the in-game chat. In-game messages (
    say
    and
    say_team
    ) are relayed to Discord, complete with team icons (πŸ”΅/πŸ”΄) and even a tombstone emoji (πŸͺ¦) if the player is dead.
  • Full Game Event Reporting: Get real-time updates for all important events:
    • Kills:
      Player A killed Player B with an awp πŸ’₯
    • Joins/Leaves:
      πŸ‘‹ Player C joined the server
    • Bomb Events:
      πŸ’£ The bomb has been planted!
    • Round/Win Events:
      πŸ”„ Round started!
      or
      πŸ”΄ Terrorists Win! (CT 5, T 2)
  • GeoIP Player Flags: See where your players are from! The bot automatically looks up player IPs and assigns a country flag (e.g., πŸ‡¨πŸ‡¦, πŸ‡ΊπŸ‡Έ). Bots get a πŸ€– icon, and any failed human lookups get a 🏳️.

How It Works

The bot is built on two simple principles:

  1. UDP Log Listener: You configure your CS 1.6 server to send its logs in real-time to the bot (using the
    logaddress_add
    command). The bot listens on a port (default:
    8009
    ), parses these logs, and formats them for Discord.
  2. Asynchronous RCON Client: When a user types a message in your Discord channel, the bot uses RCON (the server’s remote-control protocol) to send a
    say
    command to the game, broadcasting the message to all players.

How to Set Up Your Own Bridge

Ready to try it? You’ll need a server (a simple VPS or even a home machine) to run the Python script.

Step 1: Get the Script & Install Dependencies

First, save the Python code above as

cs_discord_bridge.py
.

You’ll need Python 3 installed. Then, install the required Python libraries using pip:

Bash


pip install discord.py geoip2 tomli

(Note:

tomli
is used as a fallback for
tomllib
on Python versions older than 3.11).

Step 2: Get the GeoIP Database

For the country flag feature, you need to download the free GeoLite2 Country database from MaxMind.

  1. Go to the MaxMind GeoLite2 free database page and sign up.
  2. Download the “GeoLite2-Country” database (it will be a
    .mmdb
    file).
  3. Place this file (e.g.,
    GeoLite2-Country.mmdb
    ) on your server in a location the bot can access.

Step 3: Create Your
config.toml

Next, create a file named

config.toml
in the same directory as your Python script. This is where you’ll put all your settings.

Here is a template. You must fill in the required values.

Ini, TOML


# --- Required Settings ---

# Your Discord Bot Token (get from Discord Developer Portal)
discord_token = "YOUR_BOT_TOKEN_HERE"

# The ID of the Discord channel you want to bridge
discord_channel_id = "YOUR_CHANNEL_ID_HERE"

# Your Counter-Strike 1.6 server's IP or hostname
cs_host = "12.34.56.78"

# Your Counter-Strike 1.6 server's RCON port (usually the same as game port)
cs_port = 27015

# Your server's RCON password (from server.cfg)
cs_rcon_password = "YOUR_RCON_PASSWORD_HERE"

# --- Optional Settings ---

# Path to the GeoIP database you downloaded in Step 2
geoip_db = "/path/to/GeoLite2-Country.mmdb"

# The IP address for the bot to listen on.
# "0.0.0.0" is usually correct to listen on all available IPs.
listen_host = "0.0.0.0"

# The port for the bot to listen for CS logs on.
# This MUST match what you set in Step 4.
listen_port = 8009

# Set to 'true' to allow bot-on-bot kills and other bot-only
# activity to be posted even when no humans are on.
allow_bot_messages = false

# The prefix for bot commands in Discord (e.g., !players)
# The bridge will ignore messages starting with this.
discord_prefix = "!"
  • To get a Bot Token: You need to create an “Application” in the Discord Developer Portal. Create a Bot, and be sure to enable the Message Content Intent under the “Bot” tab.
  • To get a Channel ID: In Discord, enable Developer Mode (Settings > Advanced), then right-click your channel and select “Copy ID”.

Step 4: Configure Your CS 1.6 Server

Now, tell your Counter-Strike server to send its logs to your bot.

Add the following line to your server’s

server.cfg
or
autoexec.cfg
. Replace
BOT_SERVER_IP
with the IP of the machine where your Python script is running, and make sure the port matches your
listen_port
from
config.toml
.


log on
logaddress_add BOT_SERVER_IP:8009

You will need to restart your server (or change maps) for this to take effect.

Step 5: Run the Bot!

You’re all set. Go to the directory with your script and config file and run:

Bash


python3 cs_discord_bridge.py

If all goes well, you’ll see console messages indicating it has logged into Discord and is listening for logs. Now, when you join your server, you should see the activity pop up in your Discord channel!


That’s it! You now have a smart, modern bridge to your classic CS 1.6 server. Feel free to grab the code and try it out.

https://sbmesh.com/bot.py


#!/usr/bin/env python3
"""
cs_discord_bridge.py β€” Two-way Discord ↔ Counter-Strike 1.6 bridge
Features:
 - Chat relay (say, say_team) with tombstone for dead players
 - Join / leave (posts leave messages always)
 - Kill events (killer β†’ victim, with flags, headshots)
 - Suicide / world kills
 - Bomb events (spawned, dropped, got, planted, defused, bombed) with πŸ’£ emoji
 - Discord β†’ CS chat via asynchronous RCON client
 - Consistent GeoIP flags (πŸ€– for bots, 🏳️ for failed human lookups)
 - Enhanced debugging for shell command execution
 - Round start/end and team win messages with scores
 - Map change messages suppressed until human detected, re-suppressed when all humans leave
 - Messages sent when human player (non-BOT SteamID) detected in logs
 - Message suppression for non-disconnect events when no humans

Listen port set to 8008 for CS logs.
"""

import asyncio
import datetime
import os
import re
from typing import Optional
from io import BytesIO

try:
    import tomllib  # py311+
except ModuleNotFoundError:
    import tomli as tomllib

import discord
from discord.ext import commands
import geoip2.database

def cc_to_flag(cc: str) -> str:
    if not cc or len(cc) != 2:
        return "🏳️"  # White flag for failed GeoIP lookups for humans
    base = 127397
    return chr(ord(cc[0].upper()) + base) + chr(ord(cc[1].upper()) + base)

# --- Asynchronous RCON Client Class (UDP Version) ---
class RconClient:
    def __init__(self, host, port, password):
        self.host = host
        self.port = port
        self.password = password
        self.packet_size = 1024

    async def get_challenge(self):
        loop = asyncio.get_running_loop()
        on_done = loop.create_future()
       
        message = b'\xFF\xFF\xFF\xFFgetchallenge\n'

        class ChallengeProtocol(asyncio.DatagramProtocol):
            def __init__(self):
                self.transport = None
                self.future = on_done

            def connection_made(self, transport):
                self.transport = transport
                self.transport.sendto(message)

            def datagram_received(self, data, addr):
                try:
                    challenge = data[5:].decode().split(' ')[1].strip()
                    self.future.set_result(challenge)
                except (IndexError, UnicodeDecodeError) as e:
                    self.future.set_exception(ValueError(f"Invalid challenge response: {e}"))
                finally:
                    self.transport.close()

            def error_received(self, exc):
                self.future.set_exception(exc)
                self.transport.close()
       
        try:
            transport, protocol = await loop.create_datagram_endpoint(
                ChallengeProtocol,
                remote_addr=(self.host, self.port)
            )
            return await asyncio.wait_for(on_done, timeout=5.0)
        except asyncio.TimeoutError:
            print(f"[{now_iso()}] RCON challenge request timed out.")
        except Exception as e:
            print(f"[{now_iso()}] [ERROR] Getting RCON challenge: {e}")
        return None

    async def send_command(self, command: str):
            challenge = await self.get_challenge()
            if not challenge:
                print(f"[{now_iso()}] Cannot send command, failed to get RCON challenge.")
                return

            loop = asyncio.get_running_loop()
            on_done = loop.create_future()

            message_buffer = BytesIO()
            message_buffer.write(b'\xFF\xFF\xFF\xFF')
            message_buffer.write('rcon '.encode())
            message_buffer.write(challenge.encode())
            message_buffer.write(b' ')
            message_buffer.write(self.password.encode())
            message_buffer.write(b' ') # New line
            message_buffer.write(command.encode()) # New line
            message_buffer.write(b'\n')

            message_bytes = message_buffer.getvalue()

            class CommandProtocol(asyncio.DatagramProtocol):
                def __init__(self):
                    self.transport = None
                    self.future = on_done

                def connection_made(self, transport):
                    self.transport = transport
                    self.transport.sendto(message_bytes)
                    self.future.set_result(None)
                    self.transport.close()

                def error_received(self, exc):
                    self.future.set_exception(exc)
                    self.transport.close()

            try:
                transport, protocol = await loop.create_datagram_endpoint(
                    CommandProtocol,
                    remote_addr=(self.host, self.port)
                )
                await asyncio.wait_for(on_done, timeout=5.0)
                print(f"[{now_iso()}] Sent RCON command: {command}")
            except asyncio.TimeoutError:
                print(f"[{now_iso()}] RCON command timed out.")
            except Exception as e:
                print(f"[{now_iso()}] [ERROR] Sending RCON command: {e}")

# --- Regexes ---
HL_SAY_REGEXES = [
    re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" say "(.*)"(?: \(dead\))?$'),
    re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" say_team "(.*)"(?: \(dead\))?$'),
]

CONNECT_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" connected, address "(.+?):(\d+)"$')

JOIN_LEAVE_REGEXES = [
    re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" entered the game$'),
    re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" disconnected(?: \(reason "(.*?)"\))?$'),
    re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" joined team "(.*)"$'),
]

KILL_REGEX = re.compile(
    r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" killed "(.+?)<\d+><(.*?)><(.*?)>" with "(.*?)"(?: \(headshot\))?$'
)

SUICIDE_REGEXES = [
    re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" committed suicide with "(.*?)"$'),
    re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" killed self with "(.*?)"$'),
]

BOMB_REGEX = re.compile(
    r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" triggered "(Spawned_With_The_Bomb|Dropped_The_The_Bomb|Got_The_Bomb|Planted_The_Bomb)"'
)

TEAM_BOMB_REGEX = re.compile(
    r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Team "(CT|TERRORIST)" triggered "(Bomb_Defused|Target_Bombed)" \(CT "(\d+)"\) \(T "(\d+)"\)'
)

STEAM_VALIDATE_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)>" STEAM USERID validated$')

RCON_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Rcon: "rcon (.+)" from "(.+)"$')

MAPCHANGE_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Started map "(.+)" \(CRC ".+"')

STARTED_MAP_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Started map "(.+?)"')

ROUND_START_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: World triggered "Round_Start"$')
ROUND_END_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: World triggered "Round_End"$')
TEAM_WIN_REGEX = re.compile(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Team "(CT|TERRORIST)" triggered "(CTs_Win|Terrorists_Win)" \(CT "(\d+)"\) \(T "(\d+)"\)$')

def now_iso():
    return datetime.datetime.now().astimezone().isoformat(timespec="seconds")

class HLLogUDP(asyncio.DatagramProtocol):
    def __init__(self, on_line):
        super().__init__()
        self.on_line = on_line

    def datagram_received(self, data: bytes, addr):
        try:
            text = data.decode('utf-8', errors='replace').strip()
        except Exception:
            text = repr(data)
        for line in text.splitlines():
            asyncio.create_task(self.on_line(line, addr))

class CSDiscordBridge:
    def __init__(self, cfg: dict):
        self.cfg = cfg
        intents = discord.Intents.default()
        intents.message_content = True
        intents.messages = True  # Ensure message intents are enabled
        self.bot = commands.Bot(command_prefix=cfg.get("discord_prefix", "!"), intents=intents)
        self.channel: Optional[discord.TextChannel] = None

        self.geoip_reader = None
        db_path = self.cfg.get("geoip_db", "/home/csserver/discord/GeoLite2-Country.mmdb")
        try:
            self.geoip_reader = geoip2.database.Reader(db_path)
        except Exception as e:
            print(f"[{now_iso()}] [WARN] GeoIP not available: {e}")
        self.player_flags = {}

        self.connected_players = {}
        self.has_human_player = False
        self.allow_bot_messages = cfg.get("allow_bot_messages", False)
        self.pending_map_change = None
        self.suppress_join_messages = False

        self.bot.event(self.on_ready)
        self.bot.event(self.on_message)

        self.rcon_client = RconClient(
            self.cfg["cs_host"],
            int(self.cfg["cs_port"]),
            self.cfg["cs_rcon_password"]
        )

    async def on_ready(self):
        ch_id = int(self.cfg["discord_channel_id"])
        try:
            self.channel = self.bot.get_channel(ch_id) or await self.bot.fetch_channel(ch_id)
            print(f"[{now_iso()}] Logged in as {self.bot.user}. Bridging to #{self.channel.name} ({self.channel.id})")
        except Exception as e:
            print(f"[{now_iso()}] [ERROR] Failed to fetch Discord channel {ch_id}: {e}")
            return
       
        await self._send_startup_commands()

        listen_host = self.cfg.get("listen_host", "0.0.0.0")
        listen_port = int(self.cfg.get("listen_port", 8008))
        loop = asyncio.get_running_loop()
        print(f"[{now_iso()}] Listening for HL logs on {listen_host}:{listen_port}")
        try:
            await loop.create_datagram_endpoint(
                lambda: HLLogUDP(self.handle_hl_line),
                local_addr=(listen_host, listen_port)
            )
        except Exception as e:
            print(f"[{now_iso()}] [ERROR] Failed to start UDP listener: {e}")

    async def _send_startup_commands(self):
        print(f"[{now_iso()}] Sending RCON command to disable chat prefixes...")
        await self.rcon_client.send_command('amx_chat_prefix ""')
        await self.rcon_client.send_command('cs_chat_prefix ""')
        await self.rcon_client.send_command('sv_say_prefix ""')

    async def on_message(self, message: discord.Message):
        print(f"[{now_iso()}] [DEBUG] Received Discord message: author={message.author}, channel={message.channel.id}, content={message.content}")
        if message.author.bot:
            print(f"[{now_iso()}] [DEBUG] Ignoring message from bot: {message.author}")
            return
        if not self.channel or message.channel.id != self.channel.id:
            print(f"[{now_iso()}] [DEBUG] Message from wrong channel: {message.channel.id}, expected {self.channel.id if self.channel else 'None'}")
            return

        prefix = self.cfg.get("discord_prefix", "!")
        if message.content.startswith(prefix):
            print(f"[{now_iso()}] [DEBUG] Ignoring command with prefix: {message.content}")
            return

        content = self._sanitize_discord(message.content)
        author = message.author.display_name
       
        line = f'say (DISCORD) {author}: {content}'
        print(f"[{now_iso()}] [DEBUG] Processing Discord message: {line} (has_human_player={self.has_human_player})")
        await self._send_rcon_say(line)

    async def _send_rcon_say(self, command: str):
        limit = self.cfg.get("say_length_limit", 190)
        if len(command) > limit:
            command = command[:limit-1] + "…"
       
        print(f"[{now_iso()}] [DEBUG] Sending RCON command to CS: {command}")
        await self.rcon_client.send_command(command)

    async def handle_hl_line(self, line: str, addr):
        if "Server cvar" in line:
            return
        print(f"[{now_iso()}] Received log: {line}")

        # Strip potential garbage prefixes like οΏ½οΏ½οΏ½οΏ½log
        cleaned_line = line
        if line.startswith('\ufffd\ufffd\ufffd\ufffdlog'):
            cleaned_line = line[8:].strip()
            print(f"[{now_iso()}] Stripped garbage prefix, cleaned log: {cleaned_line}")

        log_start_match = re.search(r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}:', cleaned_line)
        if not log_start_match:
            print(f"[{now_iso()}] Unmatched log (no valid start): {cleaned_line}")
            if "connected, address" in cleaned_line and self._parse_connected(cleaned_line):
                return
            return

        cleaned_line = cleaned_line[log_start_match.start():].strip()
        print(f"[{now_iso()}] Cleaned log: {cleaned_line}")

        map_change = self._parse_mapchange(cleaned_line)
        if map_change:
            print(f"[{now_iso()}] Detected map change: {map_change}. Storing for later.")
            self.pending_map_change = map_change
            self.suppress_join_messages = True
            await self._post_to_discord(map_change, force=True)
            return
       
        if self._parse_steam_validated(cleaned_line):
            return
           
        if self._parse_rcon(cleaned_line):
            return

        if self._parse_connected(cleaned_line):
            return

        joinleave = self._parse_join_leave(cleaned_line)
        if joinleave:
            print(f"[{now_iso()}] Parsed join/leave: {joinleave} (has_human_player={self.has_human_player})")
            await self._post_to_discord(joinleave)
            return

        say, team_flag, payload = self._parse_say(cleaned_line)
        if say:
            print(f"[{now_iso()}] Parsed say (team={team_flag}): {payload} (has_human_player={self.has_human_player})")
            await self._post_to_discord(payload)
            return

        kill = self._parse_kill(cleaned_line)
        if kill:
            print(f"[{now_iso()}] Parsed kill: {kill} (has_human_player={self.has_human_player})")
            await self._post_to_discord(kill)
            return

        suicide = self._parse_suicide(cleaned_line)
        if suicide:
            print(f"[{now_iso()}] Parsed suicide: {suicide} (has_human_player={self.has_human_player})")
            await self._post_to_discord(suicide)
            return

        bomb = self._parse_bomb(cleaned_line)
        if bomb:
            print(f"[{now_iso()}] Parsed bomb event: {bomb} (has_human_player={self.has_human_player})")
            await self._post_to_discord(bomb)
            return

        if ROUND_START_REGEX.search(cleaned_line):
            payload = "πŸ”„ Round started!"
            print(f"[{now_iso()}] Parsed round start: {payload} (has_human_player={self.has_human_player})")
            self.suppress_join_messages = False
            await self._post_to_discord(payload)
            return
           
        if ROUND_END_REGEX.search(cleaned_line):
            payload = "🏁 Round ended!"
            print(f"[{now_iso()}] Parsed round end: {payload} (has_human_player={self.has_human_player})")
            await self._post_to_discord(payload)
            return
           
        m = TEAM_WIN_REGEX.search(cleaned_line)
        if m:
            team, _, ct_score, t_score = m.groups()
            if team == "CT":
                payload = f"πŸ”΅ **Counter-Terrorists Win!** (CT {ct_score}, T {t_score})"
            else:
                payload = f"πŸ”΄ **Terrorists Win!** (CT {ct_score}, T {t_score})"
            print(f"[{now_iso()}] Parsed team win: {payload} (has_human_player={self.has_human_player})")
            await self._post_to_discord(payload)
            return
           
        print(f"[{now_iso()}] Unmatched log: {cleaned_line} (has_human_player={self.has_human_player})")

    def _parse_mapchange(self, line: str) -> Optional[str]:
        m_mapchange = re.compile(
            r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: -------- Mapchange to (.+?) --------$'
        ).search(line)
        m_started_map = re.compile(
            r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: Started map "(.+?)"'
        ).search(line)

        map_name = None
        if m_mapchange:
            map_name = m_mapchange.group(1)
        elif m_started_map:
            map_name = m_started_map.group(1)

        if map_name:
            # snapshot current state before reset
            had_humans = self.has_human_player  

            # reset for the new map
            self.has_human_player = False
            self.connected_players = {}
            self.suppress_join_messages = True

            # Only announce if humans were present or bot messages are allowed
            if had_humans or self.allow_bot_messages:
                return f"πŸ”„ Map changed to `{map_name}`"
            else:
                print(f"[{now_iso()}] Suppressing mapchange announcement ({map_name}) β€” no human players were online.")
                return None

        return None

    def _parse_steam_validated(self, line: str) -> bool:
        m = STEAM_VALIDATE_REGEX.search(line)
        if m:
            name, steamid, team = m.groups()
            print(f"[{now_iso()}] Handling 'STEAM USERID validated' for {name} ({steamid}).")
            if steamid != "BOT":
                if "STEAM_ID_PENDING" in self.connected_players:
                    pending_name = self.connected_players.pop("STEAM_ID_PENDING")
                    self.connected_players[steamid] = pending_name
                    if "STEAM_ID_PENDING" in self.player_flags:
                        self.player_flags[steamid] = self.player_flags.pop("STEAM_ID_PENDING")
                        print(f"[{now_iso()}] Transferred flag for {pending_name} from STEAM_ID_PENDING to {steamid}: {self.player_flags[steamid]}")
            return True
        return False

    def _parse_rcon(self, line: str) -> bool:
        m = RCON_REGEX.search(line)
        if m:
            command, _ = m.groups()
            if not command.strip():  # Ignore incomplete RCON commands
                print(f"[{now_iso()}] [DEBUG] Ignored incomplete RCON command: {line}")
                return True
            print(f"[{now_iso()}] Handled RCON command log: {command}")
            return True
        return False

    def _parse_connected(self, line: str) -> bool:
        m = CONNECT_REGEX.search(line)
        if m:
            name, steamid, team, ip, port = m.groups()
            print(f"[{now_iso()}] [DEBUG] Matched CONNECT_REGEX for: {name} ({steamid}, IP {ip})")
            if steamid == "BOT":
                print(f"[{now_iso()}] Bot connected: {name} ({steamid})")
                self.player_flags[steamid] = "πŸ€–"
                print(f"[{now_iso()}] Assigned flag code for {name} ({steamid}): 'None' -> 'πŸ€–'")
                return True
           
            self.connected_players[steamid] = name
            self.has_human_player = True

            flag = "🏳️"
            country_code = None
            if self.geoip_reader:
                try:
                    response = self.geoip_reader.country(ip)
                    country_code = response.country.iso_code
                    if country_code:
                        flag = cc_to_flag(country_code)
                        print(f"[{now_iso()}] GeoIP lookup for {name} ({steamid}, IP {ip}): country code '{country_code}' -> flag '{flag}'")
                    else:
                        print(f"[{now_iso()}] [WARN] No country code found for IP {ip}")
                except Exception as e:
                    print(f"[{now_iso()}] [ERROR] GeoIP lookup failed for IP {ip}: {e}")
            else:
                print(f"[{now_iso()}] [ERROR] GeoIP reader not initialized for {name} ({steamid})")
           
            self.player_flags[steamid] = flag
            print(f"[{now_iso()}] Assigned flag code for {name} ({steamid}): '{country_code or 'None'}' -> '{flag}'")
            print(f"[{now_iso()}] [DEBUG] GeoIP database path: {self.cfg.get('geoip_db', '/home/csserver/discord/GeoLite2-Country.mmdb')}")
            print(f"[{now_iso()}] [DEBUG] Player flags dictionary: {self.player_flags}")
            return True
        else:
            print(f"[{now_iso()}] [DEBUG] CONNECT_REGEX failed to match: {line}")
        return False

    def _parse_join_leave(self, line: str) -> Optional[str]:
        # Ignore raw connection lines like:  " from "64.188.91.127:53453"
        if ' from "' in line:
            print(f"[{now_iso()}] Ignoring connection info line: {line.strip()}")
            return None

        entered_the_game_regex = re.compile(
            r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)?>" entered the game$'
        )

        m = entered_the_game_regex.search(line)
        if m:
            name, steamid, team = m.groups()[:3]
            if steamid == "BOT":
                self.player_flags[steamid] = "πŸ€–"
                print(f"[{now_iso()}] Stored flag for {name} ({steamid}): πŸ€–")
                print(f"[{now_iso()}] Ignoring bot join message for {name}.")
                return None
        entered_the_game_regex = re.compile(
            r'L \d{2}/\d{2}/\d{4} - \d{2}:\d{2}:\d{2}: "(.+?)<\d+><(.*?)><(.*?)?>" entered the game$'
        )

        m = entered_the_game_regex.search(line)
        if m:
            name, steamid, team = m.groups()[:3]
            if steamid == "BOT":
                self.player_flags[steamid] = "πŸ€–"
                print(f"[{now_iso()}] Stored flag for {name} ({steamid}): πŸ€–")
                print(f"[{now_iso()}] Ignoring bot join message for {name}.")
                return None
           
            if self.suppress_join_messages:
                print(f"[{now_iso()}] Suppressing join message for {name} ({steamid}) during map change.")
                self.connected_players[steamid] = name
                self.has_human_player = True
                return None

            self.connected_players[steamid] = name
            self.has_human_player = True
            name = self._sanitize_name(name)
            flag = self.player_flags.get(steamid, "🏳️")
            if flag == "🏳️" and steamid != "BOT":
                print(f"[{now_iso()}] [WARN] No flag found for {name} ({steamid}), attempting GeoIP lookup")
                m_connect = CONNECT_REGEX.search(line)
                if m_connect:
                    _, _, _, ip, _ = m_connect.groups()
                    if self.geoip_reader and ip:
                        try:
                            response = self.geoip_reader.country(ip)
                            country_code = response.country.iso_code
                            if country_code:
                                flag = cc_to_flag(country_code)
                                self.player_flags[steamid] = flag
                                print(f"[{now_iso()}] Fallback GeoIP lookup for {name} ({steamid}, IP {ip}): country code '{country_code}' -> flag '{flag}'")
                            else:
                                print(f"[{now_iso()}] [WARN] No country code found for IP {ip} in fallback lookup")
                        except Exception as e:
                            print(f"[{now_iso()}] [ERROR] Fallback GeoIP lookup failed for IP {ip}: {e}")
            print(f"[{now_iso()}] Human player joined: {name} ({steamid}), flag={flag}, has_human_player={self.has_human_player}")
            return f'πŸ‘‹ {flag} `{name}` joined the server'

        for rx in JOIN_LEAVE_REGEXES:
            m = rx.search(line)
            if m:
                if "disconnected" in line:
                    name, steamid, _, reason = m.groups()
                    if steamid == "BOT":
                        print(f"[{now_iso()}] Ignoring bot leave message for {name}.")
                        return None
                   
                    if self.suppress_join_messages:
                        print(f"[{now_iso()}] Suppressing leave message for {name} ({steamid}) during map change window")
                        if steamid in self.connected_players:
                            del self.connected_players[steamid]
                        return None
                   
                    name = self._sanitize_name(name)
                    flag = self.player_flags.get(steamid, "🏳️")
                    leave_message = f'❌ {flag} `{name}` left the server' + (f' ({reason})' if reason else '')
                    if steamid in self.connected_players:
                        del self.connected_players[steamid]
                        if not any(sid != "BOT" for sid in self.connected_players):
                            self.has_human_player = False
                            print(f"[{now_iso()}] No human players remain, has_human_player={self.has_human_player}")
                    print(f"[{now_iso()}] Human player left: {name} ({steamid}), flag={flag}, has_human_player={self.has_human_player}")
                    return leave_message
                elif "joined team" in line:
                    name, steamid, team, _ = m.groups()
                    if steamid == "BOT":
                        self.player_flags[steamid] = "πŸ€–"
                        print(f"[{now_iso()}] Stored flag for {name} ({steamid}): πŸ€–")
                        print(f"[{now_iso()}] Ignoring bot joined team message.")
                        return None
                    self.has_human_player = True
                    print(f"[{now_iso()}] Human detected in joined team: {name} ({steamid}), has_human_player={self.has_human_player}")
                    return None
        return None

    def _parse_say(self, line: str):
        for rx in HL_SAY_REGEXES:
            m = rx.search(line)
            if m:
                name, steamid, team, msg = m.groups()
                team_flag = "say_team" in rx.pattern or "say_team" in line
                name = self._sanitize_name(name)
                msg = self._sanitize_cs(msg)
                flag = self.player_flags.get(steamid, "πŸ€–" if steamid == "BOT" else "🏳️")
                if steamid != "BOT":
                    self.has_human_player = True
                    print(f"[{now_iso()}] Human detected in say: {name} ({steamid}), has_human_player={self.has_human_player}")
                team_icon = "πŸ”΅" if team.upper().startswith("CT") else "πŸ”΄" if team.upper().startswith("T") else "βšͺ"
                dead_indicator = " πŸͺ¦" if "(dead)" in line else ""
                payload = f'πŸ’¬ {flag} {team_icon} `{name}`{dead_indicator}: {msg}'
                return True, team_flag, payload
        return False, False, None

    def _parse_kill(self, line: str) -> Optional[str]:
        m = KILL_REGEX.search(line)
        if not m:
            return None
        killer_name, killer_id, killer_team, victim_name, victim_id, victim_team, weapon = m.groups()

        if killer_id == "BOT" and victim_id == "BOT":
            print(f"[{now_iso()}] Ignoring kill message (bot killed bot).")
            return None

        if killer_id != "BOT" or victim_id != "BOT":
            self.has_human_player = True
            print(f"[{now_iso()}] Human detected in kill: {killer_name} ({killer_id}) or {victim_name} ({victim_id}), has_human_player={self.has_human_player}")

        killer_flag = self.player_flags.get(killer_id, "πŸ€–" if killer_id == "BOT" else "🏳️")
        victim_flag = self.player_flags.get(victim_id, "πŸ€–" if victim_id == "BOT" else "🏳️")
        killer_name = self._sanitize_name(killer_name)
        victim_name = self._sanitize_name(victim_name)

        headshot = " πŸ’₯" if "(headshot)" in line else ""
        killer_team_icon = "πŸ”΅" if killer_team.upper().startswith("CT") else "πŸ”΄" if killer_team.upper().startswith("T") else "βšͺ"
        victim_team_icon = "πŸ”΅" if victim_team.upper().startswith("CT") else "πŸ”΄" if victim_team.upper().startswith("T") else "βšͺ"

        # Determine article
        vowel_sounds = ('a', 'e', 'i', 'o', 'u')
        exceptions = {"tmp", "usp", "ump", "uzi"}  # extend if needed
        if weapon.lower() in exceptions:
            article = "a"
        else:
            article = "an" if weapon.lower().startswith(vowel_sounds) else "a"

        return f'{killer_flag} {killer_team_icon} `{killer_name}` killed {victim_flag} {victim_team_icon} `{victim_name}` with {article} *{weapon}*{headshot}'

    def _parse_suicide(self, line: str) -> Optional[str]:
        for rx in SUICIDE_REGEXES:
            m = rx.search(line)
            if m:
                name, steamid, _, weapon = m.groups()
                if steamid != "BOT":
                    self.has_human_player = True
                    print(f"[{now_iso()}] Human detected in suicide: {name} ({steamid}), has_human_player={self.has_human_player}")
                name = self._sanitize_name(name)
                flag = self.player_flags.get(steamid, "πŸ€–" if steamid == "BOT" else "🏳️")
                return f'πŸ’€ {flag} `{name}` died ({weapon})'
        return None

    def _parse_bomb(self, line: str) -> Optional[str]:
        m = BOMB_REGEX.search(line)
        if m:
            name, steamid, _, event = m.groups()
            name = self._sanitize_name(name)
            flag = self.player_flags.get(steamid, "πŸ€–" if steamid == "BOT" else "🏳️")
            if steamid != "BOT":
                self.has_human_player = True
                print(f"[{now_iso()}] Human detected in bomb event: {name} ({steamid}), has_human_player={self.has_human_player}")
            if event == "Planted_The_Bomb":
                return 'πŸ’£ *The bomb has been planted!*'
        m = TEAM_BOMB_REGEX.search(line)
        if m:
            team, event, ct_score, t_score = m.groups()
            if event == "Bomb_Defused":
                return 'πŸ’£ *The bomb has been defused!*'
            elif event == "Target_Bombed":
                return 'πŸ’£ *Target successfully bombed!*'
        return None

    async def _post_to_discord(self, content: str, force=False):
        if not self.channel:
            print(f"[{now_iso()}] [ERROR] Discord channel not set, cannot post: {content}")
            return

        # Always allow join/leave messages through (mapchange suppression already handled in _parse_join_leave)
        if force or content.startswith(("πŸ‘‹", "❌")) or self.has_human_player or self.allow_bot_messages:
            try:
                await self.channel.send(content)
                print(f"[{now_iso()}] Successfully posted to Discord: {content} (has_human_player={self.has_human_player})")
            except Exception as e:
                print(f"[{now_iso()}] [ERROR] Failed to send to Discord: {e} (content={content})")
        else:
            print(f"[{now_iso()}] Suppressing Discord message due to no human players: {content} (has_human_player={self.has_human_player})")

    def _sanitize_discord(self, s: str) -> str:
        s = s.replace('"', '').strip()
        return ''.join(ch for ch in s if 31 < ord(ch) < 127)

    def _sanitize_cs(self, s: str) -> str:
        s = s.replace('"', "'")
        return ''.join(ch for ch in s if 31 < ord(ch) < 127)

    def _sanitize_name(self, s: str) -> str:
        s = s.strip()
        if len(s) > 24:
            s = s[:23] + "…"
        return self._sanitize_cs(s)

    async def run(self):
        listen_host = self.cfg.get("listen_host", "0.0.0.0")
        listen_port = int(self.cfg.get("listen_port", 8008))
        loop = asyncio.get_running_loop()
        print(f"[{now_iso()}] Listening for HL logs on {listen_host}:{listen_port}")
        try:
            await loop.create_datagram_endpoint(
                lambda: HLLogUDP(self.handle_hl_line),
                local_addr=(listen_host, listen_port)
            )
        except Exception as e:
            print(f"[{now_iso()}] [ERROR] Failed to start UDP listener: {e}")
        await self.bot.start(self.cfg["discord_token"])

def load_config(path: str) -> dict:
    try:
        with open(path, "rb") as f:
            cfg = tomllib.load(f)
    except Exception as e:
        print(f"[{now_iso()}] [ERROR] Failed to load config {path}: {e}")
        raise SystemExit(f"Failed to load config: {e}")
    required = ["discord_token", "discord_channel_id", "cs_host", "cs_port", "cs_rcon_password"]
    for key in required:
        if key not in cfg:
            print(f"[{now_iso()}] [ERROR] Missing required config key: {key}")
            raise SystemExit(f"Missing required config key: {key}")
    return cfg

async def amain(cfg_path: str):
    cfg = load_config(cfg_path)
    bridge = CSDiscordBridge(cfg)
    await bridge.run()

def main():
    cfg_path = os.environ.get("CSBRIDGE_CONFIG", "config.toml")
    try:
        asyncio.run(amain(cfg_path))
    except KeyboardInterrupt:
        print(f"[{now_iso()}] Exiting...")
    except Exception as e:
        print(f"[{now_iso()}] [ERROR] Fatal error: {e}")

if __name__ == "__main__":
    main()

Leave a Comment

Auto