#22. Minimum server
Now let's build our own Telepact server.
#Install the Python library
pip install --pre telepact#Create a schema
Create api/hello.telepact.yaml:
- info.Hello: {}
- fn.hello:
name: string
->:
- Ok_:
message: string#Create a tiny HTTP server
Create server.py:
import asyncio
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from telepact import FunctionRouter, Message, Server, TelepactSchema
schema = TelepactSchema.from_directory('./api')
options = Server.Options()
options.auth_required = False
async def hello(function_name: str, request_message: Message) -> Message:
name = request_message.body[function_name]['name']
return Message({}, {'Ok_': {'message': f'Hello, {name}!'}})
function_router = FunctionRouter({'fn.hello': hello})
telepact_server = Server(schema, function_router, options)
class Handler(BaseHTTPRequestHandler):
def do_POST(self) -> None:
if self.path != '/api/telepact':
self.send_response(404)
self.end_headers()
return
content_length = int(self.headers.get('Content-Length', '0'))
request_bytes = self.rfile.read(content_length)
response = asyncio.run(telepact_server.process(request_bytes))
content_type = 'application/octet-stream' if '@bin_' in response.headers else 'application/json'
self.send_response(200)
self.send_header('Content-Type', content_type)
self.end_headers()
self.wfile.write(response.bytes)
def log_message(self, format_string: str, *args: object) -> None:
return
ThreadingHTTPServer(('127.0.0.1', 8002), Handler).serve_forever()#Run it
python server.py#Call it with curl
curl -s localhost:8002/api/telepact -d '[{}, {"fn.hello": {"name": "Telepact"}}]'[{}, {"Ok_": {"message": "Hello, Telepact!"}}]Even this tiny server already gives clients the standard Telepact experience: fn.ping_, fn.api_, validation, select, binary, codegen, and mocking.
Next: 23. Logging