最近一段时间经常有一些数据导出相关的工作,需要从服务器导出 CSV / Excel 等文
件给 Sales 同学查看,而 Sales 的日常沟通基本都在 Slack 上完成。
如果采用传统方式:
服务器 → 下载到本地 → 再上传到 Slack
问题非常明显:
- 文件稍微一大,下载 + 上传非常慢
- 下午 4 点之后网速明显下降(可能放寒假了 🤷)
- 严重依赖本地网络,效率不可控
- 操作步骤多,不利于重复执行
因此就产生了一个很自然的想法:
能不能让服务器直接把文件传到 Slack?
答案是:完全可以。
一、整体思路
核心思路很简单:
- 文件本身就在服务器上
- Slack 提供了官方 API 用于文件上传
- 在服务器上通过脚本,直接调用 Slack API 上传文件
这样可以做到:
- 文件 不经过本地
- 完全依赖服务器网络
- 适合大文件、批量导出
- 可脚本化、可自动化
最终数据流变为:
服务器 → Slack
二、创建 Slack App
首先需要一个 Slack App,用于获取上传文件所需的 Token。
1. 新建 App
访问:
https://api.slack.com/apps
点击 Create New App,选择你的 Workspace。
2. 配置 OAuth & Permissions
在 OAuth & Permissions 中,添加以下常用权限(根据实际需要调整):
files:write
chat:write
channels:read(如果需要)
配置完成后,安装 App 到 Workspace,即可拿到:
- Bot User OAuth Token
- 格式一般为:
xoxb-xxxx
三、服务器上传脚本
下面是一个 可直接在服务器运行的 Python 脚本,用于:
- 上传文件到指定 Slack Channel
- 支持 Thread(可传消息链接)
- 支持 @ 指定用户
- 上传完成后输出文件链接
1. 脚本代码
import os import sys import time import re from slack_sdk import WebClient import config
SLACK_TOKEN = config.slack_token DEFAULT_MENTION = "xxxx"
def log(msg: str): """Print timestamped logs to stdout.""" ts = time.strftime("%Y-%m-%d %H:%M:%S") print(f"[{ts}] {msg}", flush=True)
def parse_thread_ts(thread_link: str) -> str: """ Convert a Slack message link to thread_ts. Example: https://workspace.slack.com/archives/xxxx/p1731398999934122 Returns: 1731398999.934122 """ match = re.search(r'/p(\d{16})', thread_link) if not match: raise ValueError(f"Invalid Slack message link: {thread_link}") ts_raw = match.group(1) return f"{ts_raw[:-6]}.{ts_raw[-6:]}"
def main(): if len(sys.argv) < 3: print("Usage: python file_uploader.py <channel_id> <file_path> [thread_link_or_ts] [user_ids...]") sys.exit(1)
channel_id = sys.argv[1] file_path = sys.argv[2] thread_input = sys.argv[3] if len(sys.argv) > 3 else None user_ids = sys.argv[4:] if len(sys.argv) > 4 else [DEFAULT_MENTION]
if not os.path.isfile(file_path): raise RuntimeError("File not found")
filename = os.path.basename(file_path) file_size = os.path.getsize(file_path)
thread_ts = None if thread_input: if thread_input.startswith("http"): thread_ts = parse_thread_ts(thread_input) else: thread_ts = thread_input
log(f"Preparing to upload: {filename}") log(f"File size: {file_size / 1024 / 1024:.2f} MB")
client = WebClient(token=SLACK_TOKEN) mentions = " ".join(f"<@{uid}>" for uid in user_ids)
with open(file_path, "rb") as f: kwargs = { "file": f, "title": filename, "channel": channel_id, "initial_comment": ( f"✅ New file uploaded: *{filename}* " f"({file_size / 1024 / 1024:.2f} MB)\n{mentions}" ), } if thread_ts: kwargs["thread_ts"] = thread_ts
response = client.files_upload_v2(**kwargs)
file_info = response["file"] log("✅ Upload completed successfully") log(f"📎 File link: {file_info.get('permalink')}")
if __name__ == "__main__": main()
|
四、运行方式
基本用法
python3 file_uploader.py <channel_id> <file_path>
|
上传到指定 Thread(推荐)
python3 file_uploader.py \ channel_id \ /data/sai.csv \ https://contactout.slack.com/archives/xxx/p1769005211787589
|
脚本会自动解析 Slack 消息链接,转换为 thread_ts。
五、效果说明
文件直接从服务器上传到 Slack,不依赖本地网络,上传完成后,Sales 可立即在 Slack
中下载。