-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathserver.py
More file actions
720 lines (669 loc) · 33.3 KB
/
server.py
File metadata and controls
720 lines (669 loc) · 33.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
import os
import json
import logging
import ssl
import argparse
from dataclasses import dataclass
from typing import Optional, Dict, Any
# MCP protocol related imports
from mcp.server.lowlevel import Server # MCP server base class
from mcp.server.sse import SseServerTransport # SSE transport support
from mcp import types # MCP type definitions
# pyVmomi VMware API imports
from pyVim import connect
from pyVmomi import vim, vmodl
# Configuration data class for storing configuration options
@dataclass
class Config:
vcenter_host: str
vcenter_user: str
vcenter_password: str
datacenter: Optional[str] = None # Datacenter name (optional)
cluster: Optional[str] = None # Cluster name (optional)
datastore: Optional[str] = None # Datastore name (optional)
network: Optional[str] = None # Virtual network name (optional)
insecure: bool = False # Whether to skip SSL certificate verification (default: False)
api_key: Optional[str] = None # API access key for authentication
log_file: Optional[str] = None # Log file path (if not specified, output to console)
log_level: str = "INFO" # Log level
# VMware management class, encapsulating pyVmomi operations for vSphere
class VMwareManager:
def __init__(self, config: Config):
self.config = config
self.si = None # Service instance (ServiceInstance)
self.content = None # vSphere content root
self.datacenter_obj = None
self.resource_pool = None
self.datastore_obj = None
self.network_obj = None
self.authenticated = False # Authentication flag for API key verification
self._connect_vcenter()
def _connect_vcenter(self):
"""Connect to vCenter/ESXi and retrieve main resource object references."""
try:
if self.config.insecure:
# Connection method without SSL certificate verification
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False # Disable hostname checking
context.verify_mode = ssl.CERT_NONE
self.si = connect.SmartConnect(
host=self.config.vcenter_host,
user=self.config.vcenter_user,
pwd=self.config.vcenter_password,
sslContext=context)
else:
# Standard SSL verification connection
self.si = connect.SmartConnect(
host=self.config.vcenter_host,
user=self.config.vcenter_user,
pwd=self.config.vcenter_password)
except Exception as e:
logging.error(f"Failed to connect to vCenter/ESXi: {e}")
raise
# Retrieve content root object
self.content = self.si.RetrieveContent()
logging.info("Successfully connected to VMware vCenter/ESXi API")
# Retrieve target datacenter object
if self.config.datacenter:
# Find specified datacenter by name
self.datacenter_obj = next((dc for dc in self.content.rootFolder.childEntity
if isinstance(dc, vim.Datacenter) and dc.name == self.config.datacenter), None)
if not self.datacenter_obj:
logging.error(f"Datacenter named {self.config.datacenter} not found")
raise Exception(f"Datacenter {self.config.datacenter} not found")
else:
# Default to the first available datacenter
self.datacenter_obj = next((dc for dc in self.content.rootFolder.childEntity
if isinstance(dc, vim.Datacenter)), None)
if not self.datacenter_obj:
raise Exception("No datacenter object found")
# Retrieve resource pool (if a cluster is configured, use the cluster's resource pool; otherwise, use the host resource pool)
compute_resource = None
if self.config.cluster:
# Find specified cluster
for folder in self.datacenter_obj.hostFolder.childEntity:
if isinstance(folder, vim.ClusterComputeResource) and folder.name == self.config.cluster:
compute_resource = folder
break
if not compute_resource:
logging.error(f"Cluster named {self.config.cluster} not found")
raise Exception(f"Cluster {self.config.cluster} not found")
else:
# Default to the first ComputeResource (cluster or standalone host)
compute_resource = next((cr for cr in self.datacenter_obj.hostFolder.childEntity
if isinstance(cr, vim.ComputeResource)), None)
if not compute_resource:
raise Exception("No compute resource (cluster or host) found")
self.resource_pool = compute_resource.resourcePool
logging.info(f"Using resource pool: {self.resource_pool.name}")
# Retrieve datastore object
if self.config.datastore:
# Find specified datastore in the datacenter
self.datastore_obj = next((ds for ds in self.datacenter_obj.datastoreFolder.childEntity
if isinstance(ds, vim.Datastore) and ds.name == self.config.datastore), None)
if not self.datastore_obj:
logging.error(f"Datastore named {self.config.datastore} not found")
raise Exception(f"Datastore {self.config.datastore} not found")
else:
# Default to the datastore with the largest available capacity
datastores = [ds for ds in self.datacenter_obj.datastoreFolder.childEntity if isinstance(ds, vim.Datastore)]
if not datastores:
raise Exception("No available datastore found in the datacenter")
# Select the one with the maximum free space
self.datastore_obj = max(datastores, key=lambda ds: ds.summary.freeSpace)
logging.info(f"Using datastore: {self.datastore_obj.name}")
# Retrieve network object (network or distributed virtual portgroup)
if self.config.network:
# Find specified network in the datacenter network list
networks = self.datacenter_obj.networkFolder.childEntity
self.network_obj = next((net for net in networks if net.name == self.config.network), None)
if not self.network_obj:
logging.error(f"Network {self.config.network} not found")
raise Exception(f"Network {self.config.network} not found")
logging.info(f"Using network: {self.network_obj.name}")
else:
self.network_obj = None # If no network is specified, VM creation can choose to not connect to a network
def list_vms(self) -> list:
"""List all virtual machine names."""
vm_list = []
# Create a view to iterate over all virtual machines
container = self.content.viewManager.CreateContainerView(self.content.rootFolder, [vim.VirtualMachine], True)
for vm in container.view:
vm_list.append(vm.name)
container.Destroy()
return vm_list
def find_vm(self, name: str) -> Optional[vim.VirtualMachine]:
"""Find virtual machine object by name."""
container = self.content.viewManager.CreateContainerView(self.content.rootFolder, [vim.VirtualMachine], True)
vm_obj = None
for vm in container.view:
if vm.name == name:
vm_obj = vm
break
container.Destroy()
return vm_obj
def get_vm_performance(self, vm_name: str) -> Dict[str, Any]:
"""Retrieve performance data (CPU, memory, storage, and network) for the specified virtual machine."""
vm = self.find_vm(vm_name)
if not vm:
raise Exception(f"VM {vm_name} not found")
# CPU and memory usage (obtained from quickStats)
stats = {}
qs = vm.summary.quickStats
stats["cpu_usage"] = qs.overallCpuUsage # MHz
stats["memory_usage"] = qs.guestMemoryUsage # MB
# Storage usage (committed storage, in GB)
committed = vm.summary.storage.committed if vm.summary.storage else 0
stats["storage_usage"] = round(committed / (1024**3), 2) # Convert to GB
# Network usage (obtained from host or VM NIC statistics, latest sample)
# Here we simply obtain the latest performance counter for VM network I/O
net_bytes_transmitted = 0
net_bytes_received = 0
try:
pm = self.content.perfManager
# Define performance counter IDs to query: network transmitted and received bytes
counter_ids = []
for c in pm.perfCounter:
counter_full_name = f"{c.groupInfo.key}.{c.nameInfo.key}.{c.rollupType}"
if counter_full_name in ("net.transmitted.average", "net.received.average"):
counter_ids.append(c.key)
if counter_ids:
query = vim.PerformanceManager.QuerySpec(maxSample=1, entity=vm, metricId=[vim.PerformanceManager.MetricId(counterId=cid, instance="*") for cid in counter_ids])
stats_res = pm.QueryStats(querySpec=[query])
for series in stats_res[0].value:
# Sum data from each network interface
if series.id.counterId == counter_ids[0]:
net_bytes_transmitted = sum(series.value)
elif series.id.counterId == counter_ids[1]:
net_bytes_received = sum(series.value)
stats["network_transmit_KBps"] = net_bytes_transmitted
stats["network_receive_KBps"] = net_bytes_received
except Exception as e:
# If obtaining performance counters fails, log the error but do not terminate
logging.warning(f"Failed to retrieve network performance data: {e}")
stats["network_transmit_KBps"] = None
stats["network_receive_KBps"] = None
return stats
def create_vm(self, name: str, cpus: int, memory_mb: int, datastore: Optional[str] = None, network: Optional[str] = None) -> str:
"""Create a new virtual machine (from scratch, with an empty disk and optional network)."""
# If a specific datastore or network is provided, update the corresponding object accordingly
datastore_obj = self.datastore_obj
network_obj = self.network_obj
if datastore:
datastore_obj = next((ds for ds in self.datacenter_obj.datastoreFolder.childEntity
if isinstance(ds, vim.Datastore) and ds.name == datastore), None)
if not datastore_obj:
raise Exception(f"Specified datastore {datastore} not found")
if network:
networks = self.datacenter_obj.networkFolder.childEntity
network_obj = next((net for net in networks if net.name == network), None)
if not network_obj:
raise Exception(f"Specified network {network} not found")
# Build VM configuration specification
vm_spec = vim.vm.ConfigSpec(name=name, memoryMB=memory_mb, numCPUs=cpus, guestId="otherGuest") # guestId can be adjusted as needed
device_specs = []
# Add SCSI controller
controller_spec = vim.vm.device.VirtualDeviceSpec()
controller_spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add
controller_spec.device = vim.vm.device.ParaVirtualSCSIController() # Using ParaVirtual SCSI controller
controller_spec.device.deviceInfo = vim.Description(label="SCSI Controller", summary="ParaVirtual SCSI Controller")
controller_spec.device.busNumber = 0
controller_spec.device.sharedBus = vim.vm.device.VirtualSCSIController.Sharing.noSharing
# Set a temporary negative key for the controller for later reference
controller_spec.device.key = -101
device_specs.append(controller_spec)
# Add virtual disk
disk_spec = vim.vm.device.VirtualDeviceSpec()
disk_spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add
disk_spec.fileOperation = vim.vm.device.VirtualDeviceSpec.FileOperation.create
disk_spec.device = vim.vm.device.VirtualDisk()
disk_spec.device.capacityInKB = 1024 * 1024 * 10 # Create a 10GB disk
disk_spec.device.deviceInfo = vim.Description(label="Hard Disk 1", summary="10 GB disk")
disk_spec.device.backing = vim.vm.device.VirtualDisk.FlatVer2BackingInfo()
disk_spec.device.backing.diskMode = "persistent"
disk_spec.device.backing.thinProvisioned = True # Thin provisioning
disk_spec.device.backing.datastore = datastore_obj
# Attach the disk to the previously created controller
disk_spec.device.controllerKey = controller_spec.device.key
disk_spec.device.unitNumber = 0
device_specs.append(disk_spec)
# If a network is provided, add a virtual network adapter
if network_obj:
nic_spec = vim.vm.device.VirtualDeviceSpec()
nic_spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add
nic_spec.device = vim.vm.device.VirtualVmxnet3() # Using VMXNET3 network adapter
nic_spec.device.deviceInfo = vim.Description(label="Network Adapter 1", summary=network_obj.name)
if isinstance(network_obj, vim.Network):
nic_spec.device.backing = vim.vm.device.VirtualEthernetCard.NetworkBackingInfo(network=network_obj, deviceName=network_obj.name)
elif isinstance(network_obj, vim.dvs.DistributedVirtualPortgroup):
# Distributed virtual switch portgroup
dvs_uuid = network_obj.config.distributedVirtualSwitch.uuid
port_key = network_obj.key
nic_spec.device.backing = vim.vm.device.VirtualEthernetCard.DistributedVirtualPortBackingInfo(
port=vim.dvs.PortConnection(portgroupKey=port_key, switchUuid=dvs_uuid)
)
nic_spec.device.connectable = vim.vm.device.VirtualDevice.ConnectInfo(startConnected=True, allowGuestControl=True)
device_specs.append(nic_spec)
vm_spec.deviceChange = device_specs
# Get the folder in which to place the VM (default is the datacenter's vmFolder)
vm_folder = self.datacenter_obj.vmFolder
# Create the VM in the specified resource pool
try:
task = vm_folder.CreateVM_Task(config=vm_spec, pool=self.resource_pool)
# Wait for the task to complete
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
continue
if task.info.state == vim.TaskInfo.State.error:
raise task.info.error
except Exception as e:
logging.error(f"Failed to create virtual machine: {e}")
raise
logging.info(f"Virtual machine created: {name}")
return f"VM '{name}' created."
def clone_vm(self, template_name: str, new_name: str) -> str:
"""Clone a new virtual machine from an existing template or VM."""
template_vm = self.find_vm(template_name)
if not template_vm:
raise Exception(f"Template virtual machine {template_name} not found")
vm_folder = template_vm.parent # Place the new VM in the same folder as the template
if not isinstance(vm_folder, vim.Folder):
vm_folder = self.datacenter_obj.vmFolder
# Use the resource pool of the host/cluster where the template is located
resource_pool = template_vm.resourcePool or self.resource_pool
relocate_spec = vim.vm.RelocateSpec(pool=resource_pool, datastore=self.datastore_obj)
clone_spec = vim.vm.CloneSpec(powerOn=False, template=False, location=relocate_spec)
try:
task = template_vm.Clone(folder=vm_folder, name=new_name, spec=clone_spec)
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
continue
if task.info.state == vim.TaskInfo.State.error:
raise task.info.error
except Exception as e:
logging.error(f"Failed to clone virtual machine: {e}")
raise
logging.info(f"Cloned virtual machine {template_name} to new VM: {new_name}")
return f"VM '{new_name}' cloned from '{template_name}'."
def delete_vm(self, name: str) -> str:
"""Delete the specified virtual machine."""
vm = self.find_vm(name)
if not vm:
raise Exception(f"Virtual machine {name} not found")
try:
task = vm.Destroy_Task()
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
continue
if task.info.state == vim.TaskInfo.State.error:
raise task.info.error
except Exception as e:
logging.error(f"Failed to delete virtual machine: {e}")
raise
logging.info(f"Virtual machine deleted: {name}")
return f"VM '{name}' deleted."
def power_on_vm(self, name: str) -> str:
"""Power on the specified virtual machine."""
vm = self.find_vm(name)
if not vm:
raise Exception(f"Virtual machine {name} not found")
if vm.runtime.powerState == vim.VirtualMachine.PowerState.poweredOn:
return f"VM '{name}' is already powered on."
task = vm.PowerOnVM_Task()
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
continue
if task.info.state == vim.TaskInfo.State.error:
raise task.info.error
logging.info(f"Virtual machine powered on: {name}")
return f"VM '{name}' powered on."
def power_off_vm(self, name: str) -> str:
"""Power off the specified virtual machine."""
vm = self.find_vm(name)
if not vm:
raise Exception(f"Virtual machine {name} not found")
if vm.runtime.powerState == vim.VirtualMachine.PowerState.poweredOff:
return f"VM '{name}' is already powered off."
task = vm.PowerOffVM_Task()
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
continue
if task.info.state == vim.TaskInfo.State.error:
raise task.info.error
logging.info(f"Virtual machine powered off: {name}")
return f"VM '{name}' powered off."
# ---------------- MCP Server Definition ----------------
# Initialize MCP Server object
mcp_server = Server(name="VMware-MCP-Server", version="0.0.1")
# Define supported tools (executable operations) and resources (data interfaces)
# The implementation of tools and resources will call methods in VMwareManager
# Note: For each operation, perform API key authentication check, and only execute sensitive operations if the authenticated flag is True
# If not authenticated, an exception is raised
# Tool 1: Authentication (via API Key)
def tool_authenticate(key: str) -> str:
"""Validate the API key and enable subsequent operations upon success."""
if config.api_key and key == config.api_key:
manager.authenticated = True
logging.info("API key verification successful, client is authorized")
return "Authentication successful."
else:
logging.warning("API key verification failed")
raise Exception("Authentication failed: invalid API key.")
# Tool 2: Create virtual machine
def tool_create_vm(name: str, cpu: int, memory: int, datastore: str = None, network: str = None) -> str:
"""Create a new virtual machine."""
_check_auth() # Check access permissions
return manager.create_vm(name, cpu, memory, datastore, network)
# Tool 3: Clone virtual machine
def tool_clone_vm(template_name: str, new_name: str) -> str:
"""Clone a virtual machine from a template."""
_check_auth()
return manager.clone_vm(template_name, new_name)
# Tool 4: Delete virtual machine
def tool_delete_vm(name: str) -> str:
"""Delete the specified virtual machine."""
_check_auth()
return manager.delete_vm(name)
# Tool 5: Power on virtual machine
def tool_power_on(name: str) -> str:
"""Power on the specified virtual machine."""
_check_auth()
return manager.power_on_vm(name)
# Tool 6: Power off virtual machine
def tool_power_off(name: str) -> str:
"""Power off the specified virtual machine."""
_check_auth()
return manager.power_off_vm(name)
# Tool 7: List all virtual machines
def tool_list_vms() -> list:
"""Return a list of all virtual machine names."""
_check_auth()
return manager.list_vms()
# Resource 1: Retrieve virtual machine performance data
def resource_vm_performance(vm_name: str) -> dict:
"""Retrieve CPU, memory, storage, and network usage for the specified virtual machine."""
_check_auth()
return manager.get_vm_performance(vm_name)
# Internal helper: Check API access permissions
def _check_auth():
if config.api_key:
# If an API key is configured, require that manager.authenticated is True
if not manager.authenticated:
raise Exception("Unauthorized: API key required.")
# Register the above functions as tools and resources for the MCP Server
# Encapsulate using mcp.types.Tool and mcp.types.Resource
tools = {
"authenticate": types.Tool(
name="authenticate",
description="Authenticate using API key to enable privileged operations",
parameters={"key": str},
handler=lambda params: tool_authenticate(**params),
inputSchema={"type": "object", "properties": {"key": {"type": "string"}}, "required": ["key"]}
),
"createVM": types.Tool(
name="createVM",
description="Create a new virtual machine",
parameters={"name": str, "cpu": int, "memory": int, "datastore": Optional[str], "network": Optional[str]},
handler=lambda params: tool_create_vm(**params),
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
"cpu": {"type": "integer"},
"memory": {"type": "integer"},
"datastore": {"type": "string", "nullable": True},
"network": {"type": "string", "nullable": True}
},
"required": ["name", "cpu", "memory"]
}
),
"cloneVM": types.Tool(
name="cloneVM",
description="Clone a virtual machine from a template or existing VM",
parameters={"template_name": str, "new_name": str},
handler=lambda params: tool_clone_vm(**params),
inputSchema={
"type": "object",
"properties": {
"template_name": {"type": "string"},
"new_name": {"type": "string"}
},
"required": ["template_name", "new_name"]
}
),
"deleteVM": types.Tool(
name="deleteVM",
description="Delete a virtual machine",
parameters={"name": str},
handler=lambda params: tool_delete_vm(**params),
inputSchema={
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"]
}
),
"powerOn": types.Tool(
name="powerOn",
description="Power on a virtual machine",
parameters={"name": str},
handler=lambda params: tool_power_on(**params),
inputSchema={
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"]
}
),
"powerOff": types.Tool(
name="powerOff",
description="Power off a virtual machine",
parameters={"name": str},
handler=lambda params: tool_power_off(**params),
inputSchema={
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"]
}
),
"listVMs": types.Tool(
name="listVMs",
description="List all virtual machines",
parameters={},
handler=lambda params: tool_list_vms(),
inputSchema={"type": "object", "properties": {}}
)
}
resources = {
"vmStats": types.Resource(
name="vmStats",
uri="vmstats://{vm_name}",
description="Get CPU, memory, storage, network usage of a VM",
parameters={"vm_name": str},
handler=lambda params: resource_vm_performance(**params),
inputSchema={
"type": "object",
"properties": {
"vm_name": {"type": "string"}
},
"required": ["vm_name"]
}
)
}
# Add tools and resources to the MCP Server object
for name, tool in tools.items():
setattr(mcp_server, f"tool_{name}", tool)
for name, res in resources.items():
setattr(mcp_server, f"resource_{name}", res)
# Set the MCP Server capabilities, declaring that the tools and resources list is available
mcp_server.capabilities = {
"tools": {"listChanged": True},
"resources": {"listChanged": True}
}
# Maintain a global SSE transport instance for sending events during POST request processing
active_transport: Optional[SseServerTransport] = None
# SSE initialization request handler (HTTP GET /sse)
async def sse_endpoint(scope, receive, send):
"""Handle SSE connection initialization requests. Establish an MCP SSE session."""
global active_transport
# Construct response headers to establish an event stream
headers = [(b"content-type", b"text/event-stream")]
# Verify API key: Retrieve from request headers 'Authorization' or 'X-API-Key'
headers_dict = {k.lower().decode(): v.decode() for (k, v) in scope.get("headers", [])}
provided_key = None
if b"authorization" in scope["headers"]:
provided_key = headers_dict.get("authorization")
elif b"x-api-key" in scope["headers"]:
provided_key = headers_dict.get("x-api-key")
if config.api_key and provided_key != f"Bearer {config.api_key}" and provided_key != config.api_key:
# If the correct API key is not provided, return 401
res_status = b"401 UNAUTHORIZED"
await send({"type": "http.response.start", "status": 401, "headers": [(b"content-type", b"text/plain")]})
await send({"type": "http.response.body", "body": b"Unauthorized"})
logging.warning("No valid API key provided, rejecting SSE connection")
return
# Establish SSE transport and connect to the MCP Server
active_transport = SseServerTransport("/sse/messages")
logging.info("Established new SSE session")
# Send SSE response headers to the client, preparing to start sending events
await send({"type": "http.response.start", "status": 200, "headers": headers})
try:
async with active_transport.connect_sse(scope, receive, send) as (read_stream, write_stream):
init_opts = mcp_server.create_initialization_options()
# Run MCP Server, passing the read/write streams to the Server
await mcp_server.run(read_stream, write_stream, init_opts)
except Exception as e:
logging.error(f"SSE session encountered an error: {e}")
finally:
active_transport = None
# SSE session ended, send an empty message to indicate completion
await send({"type": "http.response.body", "body": b"", "more_body": False})
# JSON-RPC message handler (HTTP POST /sse/messages)
async def messages_endpoint(scope, receive, send):
"""Handle JSON-RPC requests sent by the client (via POST)."""
global active_transport
# Read request body data
body_bytes = b''
more_body = True
while more_body:
event = await receive()
if event["type"] == "http.request":
body_bytes += event.get("body", b'')
more_body = event.get("more_body", False)
# Parse JSON-RPC request
try:
body_str = body_bytes.decode('utf-8')
msg = json.loads(body_str)
except Exception as e:
logging.error(f"JSON parsing failed: {e}")
await send({"type": "http.response.start", "status": 400,
"headers": [(b"content-type", b"text/plain")]})
await send({"type": "http.response.body", "body": b"Invalid JSON"})
return
# Only accept requests sent through an established SSE transport
if not active_transport:
await send({"type": "http.response.start", "status": 400,
"headers": [(b"content-type", b"text/plain")]})
await send({"type": "http.response.body", "body": b"No active session"})
return
# Pass the POST request content to active_transport to trigger the corresponding MCP Server operation
try:
# Handle the POST message through SseServerTransport, which injects the request into the MCP session
await active_transport.handle_post(scope, body_bytes)
status = 200
response_body = b""
except Exception as e:
logging.error(f"Error handling POST message: {e}")
status = 500
response_body = str(e).encode('utf-8')
# Reply to the client with HTTP status
await send({"type": "http.response.start", "status": status,
"headers": [(b"content-type", b"text/plain")]})
await send({"type": "http.response.body", "body": response_body})
# Simple ASGI application routing: dispatch requests to the appropriate handler based on the path and method
async def app(scope, receive, send):
if scope["type"] == "http":
path = scope.get("path", "")
method = scope.get("method", "").upper()
if path == "/sse" and method == "GET":
# SSE initialization request
await sse_endpoint(scope, receive, send)
elif path == "/sse/messages" and method in ("POST", "OPTIONS"):
# JSON-RPC message request; handle CORS preflight OPTIONS request
if method == "OPTIONS":
# Return allowed methods
headers = [
(b"access-control-allow-methods", b"POST, OPTIONS"),
(b"access-control-allow-headers", b"Content-Type, Authorization, X-API-Key"),
(b"access-control-allow-origin", b"*")
]
await send({"type": "http.response.start", "status": 204, "headers": headers})
await send({"type": "http.response.body", "body": b""})
else:
await messages_endpoint(scope, receive, send)
else:
# Route not found
await send({"type": "http.response.start", "status": 404,
"headers": [(b"content-type", b"text/plain")]})
await send({"type": "http.response.body", "body": b"Not Found"})
else:
# Non-HTTP event, do not process
return
# Parse command-line arguments and environment variables, and load configuration
parser = argparse.ArgumentParser(description="MCP VMware ESXi Management Server")
parser.add_argument("--config", "-c", help="Configuration file path (JSON or YAML)", default=None)
args = parser.parse_args()
# Attempt to load configuration from a file or environment variables
config_data = {}
config_path = args.config or os.environ.get("MCP_CONFIG_FILE")
if config_path:
# Parse JSON or YAML based on the file extension
if config_path.endswith((".yml", ".yaml")):
import yaml
with open(config_path, 'r') as f:
config_data = yaml.safe_load(f)
elif config_path.endswith(".json"):
with open(config_path, 'r') as f:
config_data = json.load(f)
else:
raise ValueError("Unsupported configuration file format. Please use JSON or YAML")
# Override configuration from environment variables (higher priority than file)
env_map = {
"VCENTER_HOST": "vcenter_host",
"VCENTER_USER": "vcenter_user",
"VCENTER_PASSWORD": "vcenter_password",
"VCENTER_DATACENTER": "datacenter",
"VCENTER_CLUSTER": "cluster",
"VCENTER_DATASTORE": "datastore",
"VCENTER_NETWORK": "network",
"VCENTER_INSECURE": "insecure",
"MCP_API_KEY": "api_key",
"MCP_LOG_FILE": "log_file",
"MCP_LOG_LEVEL": "log_level"
}
for env_key, cfg_key in env_map.items():
if env_key in os.environ:
val = os.environ[env_key]
# Boolean type conversion
if cfg_key == "insecure":
config_data[cfg_key] = val.lower() in ("1", "true", "yes")
else:
config_data[cfg_key] = val
# Construct Config object from config_data
required_keys = ["vcenter_host", "vcenter_user", "vcenter_password"]
for k in required_keys:
if k not in config_data or not config_data[k]:
raise Exception(f"Missing required configuration item: {k}")
config = Config(**config_data)
# Initialize logging
log_level = getattr(logging, config.log_level.upper(), logging.INFO)
logging.basicConfig(level=log_level,
format="%(asctime)s [%(levelname)s] %(message)s",
filename=config.log_file if config.log_file else None)
if not config.log_file:
# If no log file is specified, output logs to the console
logging.getLogger().addHandler(logging.StreamHandler())
logging.info("Starting VMware ESXi Management MCP Server...")
# Create VMware Manager instance and connect
manager = VMwareManager(config)
# If an API key is configured, prompt that authentication is required before invoking sensitive operations
if config.api_key:
logging.info("API key authentication is enabled. Clients must call the authenticate tool to verify the key before invoking sensitive operations")
# Start ASGI server to listen for MCP SSE connections
if __name__ == "__main__":
# Start ASGI application using the built-in uvicorn server (listening on 0.0.0.0:8080)
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)