diff --git a/examples/python/xds/README.md b/examples/python/xds/README.md index cd9587a421c..cba8c0691b9 100644 --- a/examples/python/xds/README.md +++ b/examples/python/xds/README.md @@ -56,7 +56,7 @@ export GRPC_XDS_BOOTSTRAP=/etc/xds-bootstrap.json 3. Run the client: ``` -python client.py xds-experimental:///my-backend +python client.py xds:///my-backend ``` ### Verifying Configuration with a CLI Tool @@ -101,3 +101,23 @@ grpc.health.v1.Health/Check "status": "SERVING" } ``` + +### Running with Proxyless Security + +#### Run the Server with Secure Credentials + +Add the `--secure true` flag to the invocation outlined above. + +```sh +python server.py --secure true +``` + +#### Run the Client with Secure Credentials + +Add the `--secure true` flag to the invocation outlined above. + +3. Run the client: + +``` +python client.py xds:///my-backend --secure true +``` diff --git a/examples/python/xds/client.py b/examples/python/xds/client.py index ee300265a92..a969b603fad 100644 --- a/examples/python/xds/client.py +++ b/examples/python/xds/client.py @@ -18,6 +18,7 @@ import logging import argparse import grpc +import grpc.experimental import helloworld_pb2 import helloworld_pb2_grpc @@ -25,11 +26,17 @@ import helloworld_pb2_grpc _DESCRIPTION = "Get a greeting from a server." -def run(server_address): - with grpc.insecure_channel(server_address) as channel: +def run(server_address, secure): + if secure: + fallback_creds = grpc.experimental.insecure_channel_credentials() + channel_creds = grpc.xds_channel_credentials(fallback_creds) + channel = grpc.secure_channel(server_address, channel_creds) + else: + channel = grpc.insecure_channel(server_address) + with channel: stub = helloworld_pb2_grpc.GreeterStub(channel) response = stub.SayHello(helloworld_pb2.HelloRequest(name='you')) - print("Greeter client received: " + response.message) + print("Greeter client received: " + response.message) if __name__ == '__main__': @@ -37,6 +44,10 @@ if __name__ == '__main__': parser.add_argument("server", default=None, help="The address of the server.") + parser.add_argument( + "--xds-creds", + action="store_true", + help="If specified, uses xDS credentials to connect to the server.") args = parser.parse_args() logging.basicConfig() - run(args.server) + run(args.server, args.xds_creds) diff --git a/examples/python/xds/requirements.txt b/examples/python/xds/requirements.txt index 7ba651e351a..6be56983639 100644 --- a/examples/python/xds/requirements.txt +++ b/examples/python/xds/requirements.txt @@ -1,4 +1,4 @@ -grpcio>=1.28.1 +grpcio>=1.37.1 protobuf grpcio-reflection grpcio-health-checking diff --git a/examples/python/xds/server.py b/examples/python/xds/server.py index 196e2c11f3a..82fabf26641 100644 --- a/examples/python/xds/server.py +++ b/examples/python/xds/server.py @@ -16,7 +16,6 @@ from concurrent import futures import argparse import logging -import multiprocessing import socket import grpc @@ -31,6 +30,16 @@ from grpc_health.v1 import health_pb2_grpc _DESCRIPTION = "A general purpose phony server." +_LISTEN_HOST = "[::]" + +_THREAD_POOL_SIZE = 256 + +logger = logging.getLogger() +console_handler = logging.StreamHandler() +formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + class Greeter(helloworld_pb2_grpc.GreeterServicer): @@ -43,19 +52,17 @@ class Greeter(helloworld_pb2_grpc.GreeterServicer): message=f"Hello {request.name} from {self._hostname}!") -def serve(port: int, hostname: str): - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())) - - # Add the application servicer to the server. - helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(hostname), server) +def _configure_maintenance_server(server: grpc.Server, + maintenance_port: int) -> None: + listen_address = f"{_LISTEN_HOST}:{maintenance_port}" + server.add_insecure_port(listen_address) # Create a health check servicer. We use the non-blocking implementation # to avoid thread starvation. health_servicer = health.HealthServicer( experimental_non_blocking=True, - experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) - health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server) + experimental_thread_pool=futures.ThreadPoolExecutor( + max_workers=_THREAD_POOL_SIZE)) # Create a tuple of all of the services we want to export via reflection. services = tuple( @@ -63,18 +70,57 @@ def serve(port: int, hostname: str): for service in helloworld_pb2.DESCRIPTOR.services_by_name.values()) + ( reflection.SERVICE_NAME, health.SERVICE_NAME) - # Add the reflection service to the server. - reflection.enable_server_reflection(services, server) - server.add_insecure_port(f"[::]:{port}") - server.start() - # Mark all services as healthy. - overall_server_health = "" - for service in services + (overall_server_health,): + health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server) + for service in services: health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING) + reflection.enable_server_reflection(services, server) - # Park the main application thread. - server.wait_for_termination() + +def _configure_greeter_server(server: grpc.Server, port: int, secure_mode: bool, + hostname) -> None: + # Add the application servicer to the server. + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(hostname), server) + listen_address = f"{_LISTEN_HOST}:{port}" + if not secure_mode: + server.add_insecure_port(listen_address) + else: + # Use xDS credentials. + logger.info("Running with xDS Server credentials") + + # Fall back to insecure credentials. + server_fallback_creds = grpc.insecure_server_credentials() + server_creds = grpc.xds_server_credentials(server_fallback_creds) + server.add_secure_port(listen_address, server_creds) + + +def serve(port: int, hostname: str, maintenance_port: int, + secure_mode: bool) -> None: + if port == maintenance_port: + # If maintenance port and port are the same, start a single server. + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE)) + _configure_greeter_server(server, port, secure_mode, hostname) + _configure_maintenance_server(server, maintenance_port) + server.start() + logger.info("Greeter server listening on port %d", port) + logger.info("Maintenance server listening on port %d", maintenance_port) + server.wait_for_termination() + else: + # Otherwise, start two different servers. + greeter_server = grpc.server( + futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE), + xds=secure_mode) + _configure_greeter_server(greeter_server, port, secure_mode, hostname) + greeter_server.start() + logger.info("Greeter server listening on port %d", port) + maintenance_server = grpc.server( + futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE)) + _configure_maintenance_server(maintenance_server, maintenance_port) + maintenance_server.start() + logger.info("Maintenance server listening on port %d", maintenance_port) + greeter_server.wait_for_termination() + maintenance_server.wait_for_termination() if __name__ == '__main__': @@ -89,6 +135,11 @@ if __name__ == '__main__': default=None, nargs="?", help="The name clients will see in responses.") + parser.add_argument( + "--xds-creds", + action="store_true", + help="If specified, uses xDS credentials to connect to the server.") args = parser.parse_args() logging.basicConfig() - serve(args.port, args.hostname) + logger.setLevel(logging.INFO) + serve(args.port, args.hostname, args.port + 1, args.xds_creds)