[Python CSM] Get mesh-id from env var instead of bootstrap file (#37837)

Passed interop test:
  - [x] [grpc/core/master/linux/psm-csm-python](https://source.cloud.google.com/results/invocations/b9ba256b-31a9-4002-bd59-b21817aa9978)
<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #37837

PiperOrigin-RevId: 686643728
pull/37594/merge
Xuan Wang 4 months ago committed by Copybara-Service
parent 0b13caed53
commit e5952ffd9e
  1. 43
      src/python/grpcio_csm_observability/grpc_csm_observability/_csm_observability_plugin.py
  2. 2
      src/python/grpcio_csm_observability/setup.py
  3. 2
      src/python/grpcio_observability/setup.py
  4. 64
      src/python/grpcio_tests/tests/observability/_csm_observability_plugin_test.py

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import os import os
import re import re
from typing import AnyStr, Callable, Dict, Iterable, List, Optional, Union from typing import AnyStr, Callable, Dict, Iterable, List, Optional, Union
@ -79,6 +78,7 @@ class CSMOpenTelemetryLabelInjector(OpenTelemetryLabelInjector):
"CSM_CANONICAL_SERVICE_NAME", UNKNOWN_VALUE "CSM_CANONICAL_SERVICE_NAME", UNKNOWN_VALUE
) )
workload_name_value = os.getenv("CSM_WORKLOAD_NAME", UNKNOWN_VALUE) workload_name_value = os.getenv("CSM_WORKLOAD_NAME", UNKNOWN_VALUE)
mesh_id = os.getenv("CSM_MESH_ID", UNKNOWN_VALUE)
gcp_resource = GoogleCloudResourceDetector().detect() gcp_resource = GoogleCloudResourceDetector().detect()
resource_type_value = get_resource_type(gcp_resource) resource_type_value = get_resource_type(gcp_resource)
@ -133,7 +133,7 @@ class CSMOpenTelemetryLabelInjector(OpenTelemetryLabelInjector):
self._additional_exchange_labels[ self._additional_exchange_labels[
"csm.workload_canonical_service" "csm.workload_canonical_service"
] = canonical_service_value ] = canonical_service_value
self._additional_exchange_labels["csm.mesh_id"] = get_mesh_id() self._additional_exchange_labels["csm.mesh_id"] = mesh_id
def get_labels_for_exchange(self) -> Dict[str, AnyStr]: def get_labels_for_exchange(self) -> Dict[str, AnyStr]:
return self._exchange_labels return self._exchange_labels
@ -302,42 +302,3 @@ def get_resource_type(gcp_resource: Resource) -> str:
return TYPE_GCE return TYPE_GCE
else: else:
return gcp_resource_type return gcp_resource_type
# Returns the mesh ID by reading and parsing the bootstrap file. Returns "unknown"
# if for some reason, mesh ID could not be figured out.
def get_mesh_id() -> str:
config_contents = get_bootstrap_config_contents()
try:
config_json = json.loads(config_contents)
# The expected format of the Node ID is -
# projects/[GCP Project number]/networks/mesh:[Mesh ID]/nodes/[UUID]
node_id_parts = config_json.get("node", {}).get("id", "").split("/")
if len(node_id_parts) == 6 and node_id_parts[3].startswith(
MESH_ID_PREFIX
):
return node_id_parts[3][len(MESH_ID_PREFIX) :]
except json.decoder.JSONDecodeError:
return UNKNOWN_VALUE
return UNKNOWN_VALUE
def get_bootstrap_config_contents() -> str:
"""Get the contents of the bootstrap config from environment variable or file.
Returns:
The content from environment variable. Or empty str if no config was found.
"""
contents_str = ""
for source in ("GRPC_XDS_BOOTSTRAP", "GRPC_XDS_BOOTSTRAP_CONFIG"):
config = os.getenv(source)
if config:
if os.path.isfile(config): # Prioritize file over raw config
with open(config, "r") as f:
contents_str = f.read()
else:
contents_str = config
return contents_str

@ -27,7 +27,7 @@ import python_version
import grpc_version import grpc_version
CLASSIFIERS = [ CLASSIFIERS = [
"Development Status :: 4 - Beta", "Development Status :: 5 - Production/Stable",
"Programming Language :: Python", "Programming Language :: Python",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",

@ -41,7 +41,7 @@ import grpc_version
_parallel_compile_patch.monkeypatch_compile_maybe() _parallel_compile_patch.monkeypatch_compile_maybe()
CLASSIFIERS = [ CLASSIFIERS = [
"Development Status :: 4 - Beta", "Development Status :: 5 - Production/Stable",
"Programming Language :: Python", "Programming Language :: Python",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",

@ -14,10 +14,8 @@
from collections import defaultdict from collections import defaultdict
import datetime import datetime
import json
import logging import logging
import os import os
import random
import sys import sys
import time import time
from typing import Any, Callable, Dict, List, Optional, Set from typing import Any, Callable, Dict, List, Optional, Set
@ -259,68 +257,6 @@ class CSMObservabilityPluginTest(unittest.TestCase):
) )
) )
def testGetMeshIdFromConfig(self):
config_json = {
"node": {
"id": "projects/12345/networks/mesh:test_mesh_id/nodes/abcdefg"
}
}
config_str = json.dumps(config_json)
with mock.patch.dict(
os.environ, {"GRPC_XDS_BOOTSTRAP_CONFIG": config_str}
):
csm_plugin = CsmOpenTelemetryPlugin(
meter_provider=self._provider,
)
csm_label_injector = csm_plugin.plugin_options[
0
].get_label_injector()
additional_labels = csm_label_injector.get_additional_labels(
include_exchange_labels=True
)
self.assertEqual(additional_labels["csm.mesh_id"], "test_mesh_id")
def testGetMeshIdFromFile(self):
config_json = {
"node": {
"id": "projects/12345/networks/mesh:test_mesh_id/nodes/abcdefg"
}
}
config_file_path = "/tmp/" + str(random.randint(0, 100000))
with open(config_file_path, "w", encoding="utf-8") as f:
f.write(json.dumps(config_json))
with mock.patch.dict(
os.environ, {"GRPC_XDS_BOOTSTRAP": config_file_path}
):
csm_plugin = CsmOpenTelemetryPlugin(
meter_provider=self._provider,
)
csm_label_injector = csm_plugin.plugin_options[
0
].get_label_injector()
additional_labels = csm_label_injector.get_additional_labels(
include_exchange_labels=True
)
self.assertEqual(additional_labels["csm.mesh_id"], "test_mesh_id")
def testGetMeshIdFromInvalidConfig(self):
config_json = {"node": {"id": "12345"}}
config_str = json.dumps(config_json)
with mock.patch.dict(
os.environ, {"GRPC_XDS_BOOTSTRAP_CONFIG": config_str}
):
csm_plugin = CsmOpenTelemetryPlugin(
meter_provider=self._provider,
)
csm_label_injector = csm_plugin.plugin_options[
0
].get_label_injector()
additional_labels = csm_label_injector.get_additional_labels(
include_exchange_labels=True
)
self.assertEqual(additional_labels["csm.mesh_id"], "unknown")
def _validate_all_metrics_names(self, metric_names: Set[str]) -> None: def _validate_all_metrics_names(self, metric_names: Set[str]) -> None:
self._validate_server_metrics_names(metric_names) self._validate_server_metrics_names(metric_names)
self._validate_client_metrics_names(metric_names) self._validate_client_metrics_names(metric_names)

Loading…
Cancel
Save