99999久久久久久亚洲,欧美人与禽猛交狂配,高清日韩av在线影院,一个人在线高清免费观看,啦啦啦在线视频免费观看www

熱線電話:13121318867

登錄
首頁大數(shù)據(jù)時代幾行代碼,擼了個 元宇宙
幾行代碼,擼了個 元宇宙
2021-12-13
收藏
幾行代碼,擼了個 元宇宙

作者:李曉飛

來源:Python

Facebook 改名為 meta,一下子點燃了 元宇宙 這個概念。

今天我就用 Python 實現(xiàn)一個簡單的迷你元宇宙。

代碼簡潔易懂,不僅可以學習 Python 知識,還能用實踐理解元宇宙的概念。

還等什么,現(xiàn)在就開始吧!

迷你元宇宙

什么是元宇宙?

不同的人有不同的理解和認識,最能達成共識的是:

元宇宙是個接入點,每個人都可以成為其中的一個元素,彼此互動。

那么我們的元宇宙有哪些功能呢?

首先必須有可以接入的功能。

然后彼此之間可以交流信息。比如 a 發(fā)消息給 b,b 可以發(fā)消息給 a,同時可以將消息廣播出去,也就是成員之間,可以私信 和 群聊。

另外,在元宇宙的成員可以收到元宇宙的動態(tài),比如新人加入,或者有人離開等,如果玩膩了,可以離開元宇宙。

最終的效果像這樣:

幾行代碼,擼了個 元宇宙

效果

設計

如何構建接入點

直接思考可能比較困難,換個角度想,接入點其實就是 —— 服務器。

只要是上網(wǎng),每時每刻,我們都是同服務器打交的。

那就選擇最簡單的 TCP 服務器,TCP 服務器的核心是維護套接字(socket)的狀態(tài),向其中發(fā)送或者獲取信息。

python 的 socket 庫,提供了很多有關便捷方法,可以幫助我們構建。

核心代碼如下:

import socket socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket.bind((ip, port)) socket.listen() data = socket.recv(1024) ... 

創(chuàng)建一個 socket,讓其監(jiān)聽機器所擁有的一個 ip 和 端口,然后從 socket 中讀取發(fā)送過來的數(shù)據(jù)。

如何構建客戶端

客戶端是為了方便用戶鏈接到元宇宙的工具,這里,就是能鏈接到服務器的工具,服務器是 TCP 服務器,客戶端自然需要用可以鏈接 TCP 服務器的方式。

python 也已為我們備好,幾行代碼就可以搞定,核心代碼如下:

import socket client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect((ip, port)) data = client.recv(1024) ... 

代碼與服務器很像,不過去鏈接一個服務器的 ip 和 端口。

如何構建業(yè)務邏輯

首先需要讓服務器將接入的用戶管理起來。

然后當接收到用戶消息時做出判斷,是轉發(fā)給其他用戶,廣播還是做出回應。

這樣就需要構造一種消息格式,用來表示用戶消息的類型或者目的。

我們就用 @username 的格式來區(qū)分,消息發(fā)給特殊用戶還是群發(fā)。

另外,為了完成注冊功能,需要再定義一種命令格式,用于設置 username,我們可以用 name:username 的格式作為設置用戶名的命令。

構建

有了初步設計,就可以進一步構建我們的代碼了。

服務端

服務器需要同時響應多個鏈接,其中包括新鏈接創(chuàng)建,消息 和 鏈接斷開 等。

為了不讓服務器阻塞,我們采用非阻塞的鏈接,當鏈接接入時,將鏈接存儲起來,然后用 select 工具,等待有了消息的鏈接。

這個功能,已經(jīng)有人實現(xiàn)好了 simpletcp[1],只要稍作改動就好。

將其中的收到消息,建立鏈接,關閉鏈接做成回調(diào)方法,以便再外部編寫業(yè)務邏輯。

核心業(yè)務

這里說明一下核心代碼:

# 創(chuàng)建一個服務器鏈接 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.setblocking(0) self._socket.bind((self.ip, self.port)) self._socket.listen(self._max_connections) # 存放已建立的鏈接 readers = [self._socket] # 存放客戶端 ip和端口 IPs = dict() # 退出標記 用于關閉服務器 self._stop = False # 服務器主循環(huán) while readers and not self._stop: # 利用 select 從 建立的鏈接中選取一些有新消息的 read, _, err = select.select(readers, [], readers) for sock in read: if sock is self._socket: # 建立了新鏈接 # 獲取新鏈接的 socket 以及 ip和端口 client_socket, client_ip = self._socket.accept() # 將鏈接設置為非阻塞的 client_socket.setblocking(0) # 添加到監(jiān)聽隊列 readers.append(client_socket) # 存儲ip信息 IPs[client_socket] = client_ip # 調(diào)用建立鏈接回調(diào)函數(shù) self.onCreateConn(self, client_socket, client_ip) else: # 收到了新消息 try: # 獲取消息 data = sock.recv(self.recv_bytes)
            except socket.error as e: if e.errno == errno.ECONNRESET: # 表明鏈接已退出 data = None else:
                    raise e if data: # 調(diào)用收到消息回調(diào)函數(shù) self.onReceiveMsg(self, sock, IPs[sock], data) else: # 鏈接退出時,移除監(jiān)聽隊列 readers.remove(sock)
                sock.close() # 調(diào)用鏈接關閉回調(diào)函數(shù) self.onCloseConn(self, sock, IPs[sock]) # 處理存在錯誤的鏈接 for sock in err: # 移除監(jiān)聽隊列 readers.remove(sock)
        sock.close() # 調(diào)用鏈接關閉回調(diào)函數(shù) self.onCloseConn(self, sock, IPs[sock])
  • 首先利用 socket 建立一個服務器鏈接,這個和最初的 socket 核心代碼一樣
  • 不同的是設置鏈接為非阻塞的,這樣就可以通過 select 同時監(jiān)控多個鏈接,也不至于阻塞服務器了。關于 select 可以看這里[2]
  • 在主循環(huán)中,篩選出有了消息的鏈接,判斷是建立鏈接還是消息發(fā)送,調(diào)用不同的回調(diào)函數(shù)
  • 最后處理一下異常

事件處理

現(xiàn)在通過回調(diào)函數(shù),就可以編寫業(yè)務了,之間看代碼。

這段是建立鏈接時的處理:

def onCreateConn(server, sock, ip): cid = f'{ip[0]}_{ip[1]}' clients[cid] = {'cid': cid, 'sock': sock, 'name': None}
    sock.send("你已經(jīng)接入元宇宙,告訴我你的代號,輸入格式為 name:lily.".encode('utf-8'))
  • 首先計算出客戶端 id,即 cid,通過 ip 和 端口 組成
  • clients 是個詞典,用 cid 為 key,存儲了 cid、鏈接、和名稱
  • 一旦建立起來鏈接,向鏈接發(fā)送一段問候語,并要求其設置自己的名稱

然后是接收消息的回調(diào)函數(shù),這個相對復雜一些,主要是處理的情況更多:

def onReceiveMsg(server, sock, ip, data): cid = f'{ip[0]}_{ip[1]}' data = data.decode('utf-8')
    print(f"收到數(shù)據(jù): {data}")
    _from = clients[cid] if data.startswith('name:'): # 設置名稱 name = data[5:].strip() if not name:
            sock.send(f"不能設置空名稱,否則其他人找不見你".encode('utf-8')) elif not checkname(name, cid):
            sock.send(f"這個名字{name}已經(jīng)被使用,請換一個試試".encode('utf-8')) else: if not _from['name']:
                sock.send(f"{name} 很高興見到你,現(xiàn)在可以暢游元宇宙了".encode('utf-8'))
                msg = f"新成員{name} 加入了元宇宙,和TA聊聊吧".encode('utf-8')
                sendMsg(msg, _from) else:
                sock.send(f"更換名稱完成".encode('utf-8'))
                msg = f"{_from['name']} 更換名稱為 {name},和TA聊聊吧".encode('utf-8')
                sendMsg(msg, _from)
            _from['name'] = name elif '@' in data: # 私信 targets = re.findall(r'@(.+?) ', data)
        print(targets)
        msg = f"{_from['name']}: {data}".encode('utf-8')
        sendMsg(msg, _from, targets) else: # 群信 msg = f"{_from['name']}{data}".encode('utf-8')
        sendMsg(msg, _from)
  • 代碼分為兩大部分,if 前面是處理收到的消息,將 bytes 轉化為 字符串;if 開始處理具體的消息
  • 如果收到 name: 開頭的消息,表示需要設置用戶名,其中包括判重,以及給其他成員發(fā)送消息
  • 如果收到的消息里有 @,表示在發(fā)私信,先提取出需要發(fā)出的用戶們,然后將消息發(fā)送給對應的用戶
  • 如果沒有特殊標記,就表示群發(fā)
  • 其中 sendMsg 用于發(fā)送消息,接收三個參數(shù),第一個是消息,第二是發(fā)送者,第三個是接收者名稱數(shù)組

當鏈接關閉時,需要處理一下關閉的回調(diào)函數(shù):

def onCloseConn(server, sock, ip): cid = f'{ip[0]}_{ip[1]}' name = clients[cid]['name'] if name:
        msg = f"{name} 從元宇宙中消失了".encode('utf-8')
        sendMsg(msg, clients[cid]) del clients[cid]
  • 當收到鏈接斷開的消息時,合成消息,發(fā)送給其他用戶
  • 然后從客戶端緩存中刪除

客戶端

客戶端需要解決兩個問題,第一個是處理接收到的消息,第二個是允許用戶的輸入。

我們將接收消息作為一個線程,將用戶輸入作為主循環(huán)。

接收消息

先看接收消息的代碼:

def receive(client): while True: try:
            s_info = client.recv(1024) # 接受服務端的消息并解碼 if not s_info:
                print(f"{bcolors.WARNING}服務器鏈接斷開{bcolors.ENDC}") break print(f"{bcolors.OKCYAN}新的消息:{bcolors.ENDC}n", bcolors.OKGREEN + s_info.decode('utf-8')+ bcolors.ENDC) except Exception:
            print(f"{bcolors.WARNING}服務器鏈接斷開{bcolors.ENDC}") break if close: break 
  • 這是線程中用的代碼,接收一個客戶端鏈接作為參數(shù)
  • 在循環(huán)中不斷地從鏈接中獲取信息,如果沒有消息時 recv 方法會阻塞,直到有新的消息過來
  • 收到消息后,將消息寫出到控制臺上
  • bcolors 提供了一些顏色標記,將消息顯示為不同的顏色
  • close 是一個全局標記,如果客戶端需要退出時,會設置為 True,可以讓線程結束

輸入處理

下面再看一下輸入控制程序:

while True: pass value = input("")
    value = value.strip() if value == ':start': if thread:
            print(f"{bcolors.OKBLUE}您已經(jīng)在元宇宙中了{bcolors.ENDC}") else:
            client = createClient(ip, 5000)
            thread = Thread(target=receive, args=(client,))
            thread.start()
            print(f"{bcolors.OKBLUE}您進入元宇宙了{bcolors.ENDC}") elif value == ':quit' or value == ':stop': if thread:
            client.close()
            close = True print(f"{bcolors.OKBLUE}正在退出中…{bcolors.ENDC}")
            thread.join()
            print(f"{bcolors.OKBLUE}元宇宙已退出{bcolors.ENDC}")
            thread = None if value == ':quit':
            print(f"{bcolors.OKBLUE}退出程序{bcolors.ENDC}") break pass elif value == ':help':
        help() else: if client: # 聊天模式 client.send(value.encode('utf-8')) else:
            print(f'{bcolors.WARNING}還沒接入元宇宙,請先輸入 :start 接入{bcolors.ENDC}')
    client.close()
  • 主要是對不同的命令做出的相應,比如 :start 表示需要建立鏈接,:quit 表示退出等
  • 命令前加 : 是為了和一般的消息做區(qū)分,如果不帶 : 就認為是在發(fā)送消息

啟動

完成了整體編碼之后,就可以啟動了,最終的代碼由三部分組成。

第一部分是服務器端核心代碼,存放在 simpletcp.py 中。

第二部分是服務器端業(yè)務代碼,存放在 metaServer.py 中。

第三部分是客戶端代碼,存放在 metaClient.py 中。

另外需要一些輔助的處理,比如發(fā)送消息的 sendMsg 方法,顏色處理方法等,具體可以下載本文源碼了解。

進入代碼目錄,啟動命令行,執(zhí)行 python metaServer.py,輸入指令 start:

幾行代碼,擼了個 元宇宙

server

然后再打開一個命令行,執(zhí)行 python metaClient.py,輸入指令 :start,就可以接入到元宇宙:

幾行代碼,擼了個 元宇宙

client

設置自己的名字:

幾行代碼,擼了個 元宇宙

如果有新的成員加入時,就會得到消息提醒, 還可以玩點互動:

幾行代碼,擼了個 元宇宙

怎么樣好玩吧,一個元宇宙就這樣形成了,趕緊讓其他伙伴加入試試吧。

總結

元宇宙現(xiàn)在是個很熱的概念,但還是基于現(xiàn)有的技術打造的,元宇宙給人們提供了一個生活在虛擬的神奇世界里的想象空間,其實自從有了互聯(lián)網(wǎng),我們就已經(jīng)逐步生活在元宇宙之中了。

今天我們用基礎的 TCP 技術,構建了一個自己的元宇宙聊天室,雖然功能上和想象中的元宇宙相去甚遠,不過其中的主要功能已經(jīng)成形了。

如果有興趣還可以在這個基礎上加入更好玩的功能,比如好友,群組,消息記錄等等,在深入了解的同時,讓這個元宇宙更好玩。

數(shù)據(jù)分析咨詢請掃描二維碼

若不方便掃碼,搜微信號:CDAshujufenxi

數(shù)據(jù)分析師資訊
更多

OK
客服在線
立即咨詢
客服在線
立即咨詢
') } function initGt() { var handler = function (captchaObj) { captchaObj.appendTo('#captcha'); captchaObj.onReady(function () { $("#wait").hide(); }).onSuccess(function(){ $('.getcheckcode').removeClass('dis'); $('.getcheckcode').trigger('click'); }); window.captchaObj = captchaObj; }; $('#captcha').show(); $.ajax({ url: "/login/gtstart?t=" + (new Date()).getTime(), // 加隨機數(shù)防止緩存 type: "get", dataType: "json", success: function (data) { $('#text').hide(); $('#wait').show(); // 調(diào)用 initGeetest 進行初始化 // 參數(shù)1:配置參數(shù) // 參數(shù)2:回調(diào),回調(diào)的第一個參數(shù)驗證碼對象,之后可以使用它調(diào)用相應的接口 initGeetest({ // 以下 4 個配置參數(shù)為必須,不能缺少 gt: data.gt, challenge: data.challenge, offline: !data.success, // 表示用戶后臺檢測極驗服務器是否宕機 new_captcha: data.new_captcha, // 用于宕機時表示是新驗證碼的宕機 product: "float", // 產(chǎn)品形式,包括:float,popup width: "280px", https: true // 更多配置參數(shù)說明請參見:http://docs.geetest.com/install/client/web-front/ }, handler); } }); } function codeCutdown() { if(_wait == 0){ //倒計時完成 $(".getcheckcode").removeClass('dis').html("重新獲取"); }else{ $(".getcheckcode").addClass('dis').html("重新獲取("+_wait+"s)"); _wait--; setTimeout(function () { codeCutdown(); },1000); } } function inputValidate(ele,telInput) { var oInput = ele; var inputVal = oInput.val(); var oType = ele.attr('data-type'); var oEtag = $('#etag').val(); var oErr = oInput.closest('.form_box').next('.err_txt'); var empTxt = '請輸入'+oInput.attr('placeholder')+'!'; var errTxt = '請輸入正確的'+oInput.attr('placeholder')+'!'; var pattern; if(inputVal==""){ if(!telInput){ errFun(oErr,empTxt); } return false; }else { switch (oType){ case 'login_mobile': pattern = /^1[3456789]\d{9}$/; if(inputVal.length==11) { $.ajax({ url: '/login/checkmobile', type: "post", dataType: "json", data: { mobile: inputVal, etag: oEtag, page_ur: window.location.href, page_referer: document.referrer }, success: function (data) { } }); } break; case 'login_yzm': pattern = /^\d{6}$/; break; } if(oType=='login_mobile'){ } if(!!validateFun(pattern,inputVal)){ errFun(oErr,'') if(telInput){ $('.getcheckcode').removeClass('dis'); } }else { if(!telInput) { errFun(oErr, errTxt); }else { $('.getcheckcode').addClass('dis'); } return false; } } return true; } function errFun(obj,msg) { obj.html(msg); if(msg==''){ $('.login_submit').removeClass('dis'); }else { $('.login_submit').addClass('dis'); } } function validateFun(pat,val) { return pat.test(val); }