Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 72 additions & 75 deletions confdgnmi/src/confd_gnmi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
import logging
import sys
import json
from time import sleep
from contextlib import closing
import asyncio

from grpc import channel_ready_future, insecure_channel
import gnmi.proto as gp
import betterproto
import grpclib.client

import gnmi_pb2
from confd_gnmi_common import HOST, PORT, make_xpath_path, VERSION, \
common_optparse_options, common_optparse_process, make_gnmi_path, \
get_data_type, get_sub_mode
from gnmi_pb2_grpc import gNMIStub

log = logging.getLogger('confd_gnmi_client')

Expand All @@ -23,43 +23,41 @@ def __init__(self, host=HOST, port=PORT, metadata=None):
if metadata is None:
metadata = [('username', 'admin'), ('password', 'admin')]
log.info("==> host=%s, port=%i, metadata-%s", host, port, metadata)
self.channel = insecure_channel("{}:{}".format(host, port))
channel_ready_future(self.channel).result(timeout=5)
self.channel = grpclib.client.Channel(host="127.0.0.1", port=50061, ssl=None)
self.service = gp.gNMIStub(
self.channel, metadata={"username": "admin", "password": "admin"})
self.metadata = metadata
self.stub = gNMIStub(self.channel)
log.info("<== self.stub=%s", self.stub)
log.info("<== self.service=%s", self.service)

def close(self):
self.channel.close()

def get_capabilities(self):
async def get_capabilities(self):
log.info("==>")
request = gnmi_pb2.CapabilityRequest()
log.debug("Calling stub.Capabilities")
response = self.stub.Capabilities(request, metadata=self.metadata)
response = await self.service.capabilities()
log.info("<== response.supported_models=%s", response.supported_models)
return response.supported_models

@staticmethod
def make_subscription_list(prefix, paths, mode):
log.debug("==> mode=%s", mode)
qos = gnmi_pb2.QOSMarking(marking=1)
qos = gp.QoSMarking(marking=1)
subscriptions = []
for path in paths:
if mode == gnmi_pb2.SubscriptionList.STREAM:
sub = gnmi_pb2.Subscription(path=path,
mode=gnmi_pb2.SubscriptionMode.ON_CHANGE)
if mode == gp.SubscriptionListMode.STREAM:
sub = gp.Subscription(path=path, mode=gp.SubscriptionMode.ON_CHANGE)
else:
sub = gnmi_pb2.Subscription(path=path)
sub = gp.Subscription(path=path)
subscriptions.append(sub)
subscription_list = gnmi_pb2.SubscriptionList(
subscription_list = gp.SubscriptionList(
prefix=prefix,
subscription=subscriptions,
qos=qos,
mode=mode,
allow_aggregation=False,
use_models=[],
encoding=gnmi_pb2.Encoding.BYTES,
encoding=gp.Encoding.BYTES,
updates_only=False
)

Expand All @@ -69,23 +67,22 @@ def make_subscription_list(prefix, paths, mode):
@staticmethod
def make_poll_subscription():
log.debug("==>")
sub = gnmi_pb2.SubscribeRequest(poll=gnmi_pb2.Poll(), extension=[])
sub = gp.SubscribeRequest(poll=gp.Poll(), extension=[])
log.debug("<==")
return sub

@staticmethod
def generate_subscriptions(subscription_list, poll_interval=0.0,
poll_count=0):
async def generate_subscriptions(subscription_list, poll_interval=0.0,
poll_count=0):
log.debug("==> subscription_list=%s", subscription_list)

sub = gnmi_pb2.SubscribeRequest(subscribe=subscription_list,
extension=[])
sub = gp.SubscribeRequest(subscribe=subscription_list, extension=[])
log.debug("subscription_list.mode=%s", subscription_list.mode)
yield sub

if subscription_list.mode == gnmi_pb2.SubscriptionList.POLL:
if subscription_list.mode == gp.SubscriptionListMode.POLL:
for i in range(poll_count):
sleep(poll_interval)
await asyncio.sleep(poll_interval)
log.debug("Generating POLL subscription")
yield ConfDgNMIClient.make_poll_subscription()

Expand All @@ -97,20 +94,21 @@ def print_notification(n):
print("timestamp {} prefix {} atomic {}".format(n.timestamp, pfx_str, n.atomic))
print("Updates:")
for u in n.update:
if u.val.json_val:
value = json.loads(u.val.json_val)
elif u.val.json_ietf_val:
value = json.loads(u.val.json_ietf_val)
field, fvalue = betterproto.which_one_of(u.val, 'value')
if field == 'json_val':
value = json.loads(fvalue)
elif field == 'json_ietf_val':
value = json.loads(fvalue)
else:
value = str(u.val)
value = str(fvalue)
print("path: {} value {}".format(pfx_str + make_xpath_path(u.path), value))

@staticmethod
def read_subscribe_responses(responses, read_count=-1):
async def read_subscribe_responses(responses, read_count=-1):
log.info("==> read_count=%s", read_count)
# example to cancel
# responses.cancel()
for response in responses:
async for response in responses:
log.info("******* Subscription received response=%s read_count=%i",
response, read_count)
print("subscribe - response read_count={}".format(read_count))
Expand All @@ -122,52 +120,42 @@ def read_subscribe_responses(responses, read_count=-1):
log.info("Canceling read")
# See https://stackoverflow.com/questions/54588382/
# how-can-a-grpc-server-notice-that-the-client-has-cancelled-a-server-side-streami
responses.cancel()
# responses.cancel()

log.info("<==")

# TODO this API would change with more subscription support
def subscribe(self, subscription_list, read_fun=None,
poll_interval=0.0, poll_count=0, read_count=-1):
async def subscribe(self, subscription_list, read_fun=None,
poll_interval=0.0, poll_count=0, read_count=-1):
log.info("==>")
responses = self.stub.Subscribe(
ConfDgNMIClient.generate_subscriptions(subscription_list,
poll_interval, poll_count),
metadata=self.metadata)
responses = self.service.subscribe(
ConfDgNMIClient.generate_subscriptions(subscription_list, poll_interval, poll_count))
if read_fun is not None:
read_fun(responses, read_count)
await read_fun(responses, read_count)
log.info("<== responses=%s", responses)
return responses

def get(self, prefix, paths, get_type, encoding):
async def get(self, prefix, paths, get_type, encoding):
log.info("==>")
path = []
for p in paths:
path.append(p)
request = gnmi_pb2.GetRequest(prefix=prefix, path=path,
type=get_type,
encoding=encoding,
extension=[])
response = self.stub.Get(request, metadata=self.metadata)

response = await self.service.get(path=p, type=get_type, encoding=encoding, extension=[])
log.info("<== response.notification=%s", response.notification)
return response.notification

def set(self, prefix, path_vals):
async def set(self, prefix, path_vals):
log.info("==> prefix=%s path_vals=%s", prefix, path_vals)
update = []
for pv in path_vals:
up = gnmi_pb2.Update(path=pv[0], val=pv[1])
update.append(up)
request = gnmi_pb2.SetRequest(prefix=prefix, update=update)
response = self.stub.Set(request, metadata=self.metadata)
response = await self.service.set(prefix=prefix,
update=[gp.Update(path=pv[0], val=pv[1])
for pv in path_vals])

log.info("<== response=%s", response)
return response

def delete(self, prefix, paths):
async def delete(self, prefix, paths):
log.info("==> prefix=%s paths=%s", prefix, paths)
request = gnmi_pb2.SetRequest(prefix=prefix, delete=paths)
response = self.stub.Set(request, metadata=self.metadata)
response = await self.service.set(prefix=prefix, delete=paths)
log.info("<== response=%s", response)
return response

Expand Down Expand Up @@ -217,15 +205,15 @@ def parse_args(args):
return opt


if __name__ == '__main__':
async def main():
opt = parse_args(args=sys.argv[1:])
common_optparse_process(opt, log)
log.debug("opt=%s", opt)
log.info("paths=%s vals=%s", opt.paths, opt.vals)
prefix_str = opt.prefix
prefix = make_gnmi_path(prefix_str)
paths = [make_gnmi_path(p) for p in opt.paths]
vals = [gnmi_pb2.TypedValue(json_ietf_val=v.encode()) for v in opt.vals]
vals = [gp.TypedValue(json_ietf_val=v.encode()) for v in opt.vals]

datatype = get_data_type(opt.datatype)
subscription_mode = get_sub_mode(opt.submode)
Expand All @@ -238,29 +226,34 @@ def parse_args(args):
datatype, subscription_mode, poll_interval, poll_count,
read_count)

encoding = dict(BYTES=gnmi_pb2.Encoding.BYTES,
JSON=gnmi_pb2.Encoding.JSON,
JSON_IETF=gnmi_pb2.Encoding.JSON_IETF)[opt.encoding]
encoding = dict(BYTES=gp.Encoding.BYTES,
JSON=gp.Encoding.JSON,
JSON_IETF=gp.Encoding.JSON_IETF)[opt.encoding]
subscription_list = ConfDgNMIClient.make_subscription_list(
prefix, paths, subscription_mode)

with closing(ConfDgNMIClient(HOST, PORT)) as client:
if opt.operation == "capabilities":
supported_models = client.get_capabilities()
print("Capabilities - supported models:")
for m in supported_models:
print("name:{} organization:{} version: {}".format(m.name,
m.organization,
m.version))
print('get caps')
try:
supported_models = await client.get_capabilities()
print("Capabilities - supported models:")
for m in supported_models:
print("name:{} organization:{} version: {}".format(m.name,
m.organization,
m.version))
except Exception as ex:
print('failed', ex)

elif opt.operation == "subscribe":
print("Starting subscription ....")
client.subscribe(subscription_list,
read_fun=ConfDgNMIClient.read_subscribe_responses,
poll_interval=poll_interval, poll_count=poll_count,
read_count=read_count)
await client.subscribe(subscription_list,
read_fun=ConfDgNMIClient.read_subscribe_responses,
poll_interval=poll_interval, poll_count=poll_count,
read_count=read_count)
print(".... subscription done")
elif opt.operation == "get":
notification = client.get(prefix, paths, datatype, encoding)
notification = await client.get(prefix, paths, datatype, encoding)
print("Get - Notifications:")
for n in notification:
ConfDgNMIClient.print_notification(n)
Expand All @@ -272,9 +265,9 @@ def parse_args(args):
raise RuntimeError(
"Number of paths (--path) must be the same as number of vals (--val)!")
else:
response = client.set(prefix, list(zip(paths, vals)))
response = await client.set(prefix, list(zip(paths, vals)))
else:
response = client.delete(prefix, paths)
response = await client.delete(prefix, paths)
print("Set - UpdateResult:")
print("timestamp {} prefix {}".format(response.timestamp,
make_xpath_path(
Expand All @@ -286,3 +279,7 @@ def parse_args(args):
r.path)))
else:
log.warning("Unknown operation %s", opt.operation)


if __name__ == '__main__':
asyncio.run(main())
38 changes: 19 additions & 19 deletions confdgnmi/src/confd_gnmi_common.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging
from typing import Tuple, Dict

import gnmi_pb2
import gnmi.proto as gp

VERSION = "0.2.0"
HOST = "localhost"
PORT = 50061
logging.basicConfig(
format='%(asctime)s:%(relativeCreated)s %(levelname)s:%(filename)s:%(lineno)s:%(funcName)s %(message)s [%(threadName)s]',
level=logging.WARNING)
format='%(asctime)s %(levelname)s:%(filename)s:%(lineno)s:%(funcName)s %(message)s [%(threadName)s]',
level=logging.DEBUG)
log = logging.getLogger('confd_gnmi_common')


Expand Down Expand Up @@ -66,7 +66,7 @@ def make_name_keys(elem_string) -> Tuple[str, Dict[str, str]]:
# Crate gNMI Path object from string representation of path
# see: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md#222-paths
# TODO tests
def make_gnmi_path(xpath_string, origin=None, target=None) -> gnmi_pb2.Path:
def make_gnmi_path(xpath_string, origin=None, target=None) -> gp.Path:
"""
Create gnmi path from string path
:param xpath_string:
Expand All @@ -82,9 +82,9 @@ def make_gnmi_path(xpath_string, origin=None, target=None) -> gnmi_pb2.Path:
for e in elem_strings:
if e != '':
(name, keys) = make_name_keys(e)
elem = gnmi_pb2.PathElem(name=name, key=keys)
elem = gp.PathElem(name=name, key=keys)
elems.append(elem)
path = gnmi_pb2.Path(elem=elems, target=target, origin=origin)
path = gp.Path(elem=elems, target=target, origin=origin)
log.debug("<== path=%s", path)
return path

Expand Down Expand Up @@ -161,32 +161,32 @@ def make_formatted_path(gnmi_path, gnmi_prefix=None, quote_val=False) -> str:


def add_path_prefix(path, prefix):
return gnmi_pb2.Path(elem=list(prefix.elem) + list(path.elem),
origin=path.origin,
target=path.target)
return gp.Path(elem=list(prefix.elem) + list(path.elem),
origin=path.origin,
target=path.target)


def remove_path_prefix(path, prefix):
assert path.elem[:len(prefix.elem)] == prefix.elem[:]
return gnmi_pb2.Path(elem=path.elem[len(prefix.elem):],
origin=path.origin,
target=path.target)
return gp.Path(elem=path.elem[len(prefix.elem):],
origin=path.origin,
target=path.target)


def get_data_type(datatype_str):
datatype_map = {
"ALL": gnmi_pb2.GetRequest.DataType.ALL,
"CONFIG": gnmi_pb2.GetRequest.DataType.CONFIG,
"STATE": gnmi_pb2.GetRequest.DataType.STATE,
"OPERATIONAL": gnmi_pb2.GetRequest.DataType.OPERATIONAL,
"ALL": gp.GetRequestDataType.ALL,
"CONFIG": gp.GetRequestDataType.CONFIG,
"STATE": gp.GetRequestDataType.STATE,
"OPERATIONAL": gp.GetRequestDataType.OPERATIONAL,
}
return datatype_map[datatype_str]


def get_sub_mode(mode_str):
mode_map = {
"ONCE": gnmi_pb2.SubscriptionList.ONCE,
"POLL": gnmi_pb2.SubscriptionList.POLL,
"STREAM": gnmi_pb2.SubscriptionList.STREAM,
"ONCE": gp.SubscriptionListMode.ONCE,
"POLL": gp.SubscriptionListMode.POLL,
"STREAM": gp.SubscriptionListMode.STREAM,
}
return mode_map[mode_str]