需求:因为项目需要,边做边学python,这次需要使用socket功能,正常在main.py中写个socket,还是OK的,但是我想要在其他文件中,直接使用socket的emit方法,需要在文件结构上进行一些调整。
1、写一下前端代码:
<template>
<div>
<h1>Hello Socket</h1>
</div>
</template>
<script>
import * as io from 'socket.io-client';
export default {
mounted() {
this.socket = io.connect("http://localhost:8000");
this.socket.emit('Client', "Hello, Server!");
this.socket.on("Server", args => {
console.log(args)
});
}
}
</script>
2、下面是python代码
新建文件socket.py
import socketio
background_task_started = False
sio = socketio.AsyncServer(
async_mode='asgi',
cors_allowed_origins='*')
async def background_task():
count = 0
while True:
await sio.sleep(10)
count += 1
await sio.emit('Server', {'data': "Hello, Client"})
@sio.event
async def connect(sid, environ):
print(f'{sid} connected!')
global background_task_started
if not background_task_started:
sio.start_background_task(background_task)
background_task_started = True
await sio.emit("Server", {'data': "Connect",
'count': 0})
@sio.event
async def disconnect(sid):
print('disconnect', sid)
from fastapi import Depends, FastAPI, Header, HTTPException
from db.mongodb_utils import close_mongo_connection, connect_to_mongo
from fastapi.middleware.cors import CORSMiddleware
import socketio
from controllers.socket import sio
import uvicorn
import zmq
def get_app() -> FastAPI(
title="接口",
description="api",
version="0.0.1"
):
app = FastAPI()
app.add_event_handler("startup", connect_to_mongo)
app.add_event_handler("shutdown", close_mongo_connection)
sio_app = socketio.ASGIApp(sio, other_asgi_app=app)
app.mount('/', sio_app)
return app
app = get_app()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
4、其他文件中使用
在其他文件中使用,需要如下代码即可
from controllers.socket import sio
await sio.emit(
'Server',
{'data': "大铁牛"}
)