Examples

Code examples can be found in the examples directory on GitHub

Also take a look at the unit tests in the tests directory.

Get all calling users

Source: calling_users.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4Get all calling users within the org
 5"""
 6
 7from dotenv import load_dotenv
 8
 9from wxc_sdk import WebexSimpleApi
10
11load_dotenv()
12
13api = WebexSimpleApi()
14
15# using wxc_sdk.people.PeopleApi.list to iterate over persons
16# Parameter calling_data needs to be set to true to gat calling specific information
17# calling users have the attribute location_id set
18calling_users = [user for user in api.people.list(calling_data=True)
19                 if user.location_id]
20print(f'{len(calling_users)} users:')
21print('\n'.join(user.display_name for user in calling_users))

Get all calling users (async variant)

Source: calling_users_async.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4Get all calling users within the org using the (experimental) async API
 5"""
 6import asyncio
 7import time
 8
 9from dotenv import load_dotenv
10
11from wxc_sdk.as_api import AsWebexSimpleApi
12
13load_dotenv()
14
15
16async def get_calling_users():
17    """
18    Get details of all calling enabled users by:
19    1) getting all calling users
20    2) collecting all users that have a calling license
21    3) getting details for all users
22    """
23    async with AsWebexSimpleApi(concurrent_requests=40) as api:
24        print('Collecting calling licenses')
25        calling_license_ids = set(lic.license_id for lic in await api.licenses.list()
26                                  if lic.webex_calling)
27
28        # get users with a calling license
29        calling_users = [user async for user in api.people.list_gen()
30                         if any(lic_id in calling_license_ids for lic_id in user.licenses)]
31        print(f'{len(calling_users)} users:')
32        print('\n'.join(user.display_name for user in calling_users))
33
34        # get details for all users
35        start = time.perf_counter()
36        details = await asyncio.gather(*[api.people.details(person_id=user.person_id, calling_data=True)
37                                         for user in calling_users])
38        expired = time.perf_counter() - start
39        print(f'Got details for {len(details)} users in {expired * 1000:.3f} ms')
40
41
42asyncio.run(get_calling_users())

Get all users without phones

Source: users_wo_devices.py

  1#!/usr/bin/env python
  2"""
  3Get calling users without devices
  4"""
  5import asyncio
  6import logging
  7import os
  8from itertools import chain
  9from typing import Optional
 10
 11from dotenv import load_dotenv
 12
 13from wxc_sdk import Tokens
 14from wxc_sdk.as_api import AsWebexSimpleApi
 15from wxc_sdk.common import UserType
 16from wxc_sdk.integration import Integration
 17from wxc_sdk.person_settings import PersonDevicesResponse
 18from wxc_sdk.scopes import parse_scopes
 19
 20
 21def env_path() -> str:
 22    """
 23    determine path for .env to load environment variables from; based on name of this file
 24    :return: .env file path
 25    """
 26    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
 27
 28
 29def yml_path() -> str:
 30    """
 31    determine path of YML file to persist tokens
 32    :return: path to YML file
 33    :rtype: str
 34    """
 35    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
 36
 37
 38def build_integration() -> Integration:
 39    """
 40    read integration parameters from environment variables and create an integration
 41    :return: :class:`wxc_sdk.integration.Integration` instance
 42    """
 43    client_id = os.getenv('INTEGRATION_CLIENT_ID')
 44    client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
 45    scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
 46    redirect_url = 'http://localhost:6001/redirect'
 47    if not all((client_id, client_secret, scopes)):
 48        raise ValueError('failed to get integration parameters from environment')
 49    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
 50                       redirect_url=redirect_url)
 51
 52
 53def get_tokens() -> Optional[Tokens]:
 54    """
 55    Tokens are read from a YML file. If needed an OAuth flow is initiated.
 56
 57    :return: tokens
 58    :rtype: :class:`wxc_sdk.tokens.Tokens`
 59    """
 60
 61    integration = build_integration()
 62    tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
 63    return tokens
 64
 65
 66async def main():
 67    # get environment variables from .env; required for integration parameters
 68    load_dotenv(env_path())
 69
 70    # get tokens from cache or create a new set of tokens using the integration defined in .env
 71    tokens = get_tokens()
 72
 73    async with AsWebexSimpleApi(tokens=tokens) as api:
 74        # get calling users
 75        calling_users = [user for user in await api.people.list(calling_data=True)
 76                         if user.location_id]
 77
 78        # get device info for all users
 79        user_device_infos = await asyncio.gather(*[api.person_settings.devices(person_id=user.person_id)
 80                                                   for user in calling_users])
 81        user_device_infos: list[PersonDevicesResponse]
 82        users_wo_devices = [user for user, device_info in zip(calling_users, user_device_infos)
 83                            if not device_info.devices]
 84
 85        # alternatively we can collect all device owners
 86        device_owner_ids = set(owner.owner_id
 87                               for device in chain.from_iterable(udi.devices for udi in user_device_infos)
 88                               if (owner := device.owner) and owner.owner_type == UserType.people)
 89
 90        # ... and collect all other users (the ones not owning a device)
 91        users_not_owning_a_device = [user for user in calling_users
 92                                     if user.person_id not in device_owner_ids]
 93
 94    users_wo_devices.sort(key=lambda u: u.display_name)
 95    print(f'{len(users_wo_devices)} users w/o devices:')
 96    print('\n'.join(f'{user.display_name} ({user.emails[0]})'
 97                    for user in users_wo_devices))
 98
 99    print()
100    users_not_owning_a_device.sort(key=lambda u: u.display_name)
101    print(f'{len(users_not_owning_a_device)} users not owning a device:')
102    print('\n'.join(f'{user.display_name} ({user.emails[0]})'
103                    for user in users_not_owning_a_device))
104
105
106if __name__ == '__main__':
107    # enable DEBUG logging to a file; REST log shows all requests
108    logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
109                        filemode='w', level=logging.DEBUG, format='%(asctime)s %(threadName)s %(message)s')
110    asyncio.run(main())

Default call forwarding settings for all users

This example start with the list of all calling users and then calls wxc_sdk.person_settings.forwarding.PersonForwardingApi.configure() for each user. To speed up things a ThreadPoolExecutor is used and all update operations are scheduled for execution by the thread pool.

Source: reset_call_forwarding.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4Reset call forwarding to default for all users in the org
 5"""
 6import asyncio
 7import logging
 8import time
 9
10from dotenv import load_dotenv
11
12from wxc_sdk.all_types import PersonForwardingSetting
13from wxc_sdk.as_api import AsWebexSimpleApi
14
15
16async def main():
17    # load environment. SDk fetches the access token from WEBEX_ACCESS_TOKEN environment variable
18    load_dotenv()
19
20    logging.basicConfig(level=logging.INFO)
21
22    # set to DEBUG to see the actual requests
23    logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
24
25    async with AsWebexSimpleApi() as api:
26
27        # get all calling users
28        start = time.perf_counter_ns()
29        calling_users = [user for user in await api.people.list(calling_data=True)
30                         if user.location_id]
31        print(f'Got {len(calling_users)} calling users in '
32              f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
33
34        # set call forwarding to default for all users
35        # default call forwarding settings
36        forwarding = PersonForwardingSetting.default()
37
38        # schedule update for each user and wait for completion
39        start = time.perf_counter_ns()
40        await asyncio.gather(*[api.person_settings.forwarding.configure(person_id=user.person_id,
41                                                                        forwarding=forwarding)
42                               for user in calling_users])
43        print(f'Reset call forwarding to default for {len(calling_users)} users in '
44              f'{(time.perf_counter_ns() - start) / 1e6:.3f} ms')
45
46if __name__ == '__main__':
47    asyncio.run(main())

Modify number of rings configuration for users read from CSV

Here we read a bunch of user email addresses from a CSV. The CSV as an ERROR column and we only want to consider users without errors.

For all these users a VM setting is updated and the results are written to a CSV for further processing.

Source: modify_voicemail.py

 1#!/usr/bin/env python
 2"""
 3Example Script
 4Modifying number of rings configuration in voicemail settings
 5Run -> python3 modify_voicemail.py modify_voicemail.csv
 6"""
 7import csv
 8import sys
 9import traceback
10from concurrent.futures import ThreadPoolExecutor
11
12from dotenv import load_dotenv
13
14from wxc_sdk import WebexSimpleApi
15from wxc_sdk.all_types import *
16
17VOICEMAIL_SETTINGS_NUMBER_OF_RINGS = 6
18
19# loading environment variables - use .env file for development
20load_dotenv()
21
22
23def update_vm_settings():
24    """
25    actually update VM settings for all users present in input CSV
26    """
27    api = WebexSimpleApi()
28    final_report = []
29    mail_ids = []
30    # using wxc_sdk.people.PeopleApi.list to iterate over persons
31    # Parameter calling_data needs to be set to true to gat calling specific information
32    # calling users have the attribute location_id set
33    calling_users = [user for user in api.people.list(calling_data=True)
34                     if user.location_id]
35    print(f'{len(calling_users)} users:')
36    print('\n'.join(user.display_name for user in calling_users))
37
38    # get CSV file name from command line
39    with open(str(sys.argv[1]), 'r') as csv_file:
40        reader = csv.DictReader(csv_file)
41        # read all records from CSV. Ony consider records w/o error
42        for col in reader:
43            if not col['ERROR']:
44                # collect email address from USERNAME column for further processing
45                mail_ids.append(col['USERNAME'])
46            else:
47                final_report.append((col['USERNAME'], 'FAILED DUE TO ERROR REASON IN INPUT FILE'))
48
49    # work on calling users that have an email address that we read from the CSV
50    filteredUsers = [d for d in calling_users if d.emails[0] in mail_ids]
51
52    print("\nCalling Users in CI  - Count ", len(calling_users))
53    print("Mail IDs from input file after removing error columns - Count ", len(mail_ids))
54    print("FilteredUsers Users -  Count", len(filteredUsers))
55
56    def set_number_of_rings(user: Person):
57        """
58        Read VM config for a user
59        :param user: user to update
60        """
61        try:
62            # shortcut
63            vm = api.person_settings.voicemail
64
65            # Read Current configuration
66            vm_settings = vm.read(person_id=user.person_id)
67            print(f'\n Existing Configuration: {vm_settings} ')
68
69            # Modify number of rings value
70            vm_settings.send_unanswered_calls.number_of_rings = VOICEMAIL_SETTINGS_NUMBER_OF_RINGS
71            vm.configure(person_id=user.person_id, settings=vm_settings)
72            # Read configuration after changes
73            vm_settings = vm.read(person_id=user.person_id)
74            print(f'\n New Configuration: {vm_settings} ')
75            final_report.append((user.display_name, 'SUCCESS'))
76        except Exception as e:
77            final_report.append((user.display_name, 'FAILURE'))
78            print("type error: " + str(e))
79            print(traceback.format_exc())
80        return
81
82    # Modify settings for the filtered users
83    with ThreadPoolExecutor() as pool:
84        list(pool.map(lambda user: set_number_of_rings(user),
85                      filteredUsers))
86
87    print(final_report)
88    with open('output.csv', 'w') as f:
89        write = csv.writer(f)
90        write.writerow(["USERNAME", "STATUS"])
91        write.writerows(final_report)
92
93
94if __name__ == '__main__':
95    update_vm_settings()

Holiday schedule w/ US national holidays for all US locations

This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates a “National Holidays” holiday schedule for all US locations with all these national holidays.

A rudimentary API implementation in calendarific.py is used for the requests to https://calendarific.com/. Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get a free API account key which then is read from environment variable CALENDARIFIC_KEY.

Source: us_holidays.py

  1#!/usr/bin/env python
  2"""
  3Example script
  4Create a holiday schedule for all US locations with all national holidays
  5"""
  6
  7import logging
  8from collections import defaultdict
  9from concurrent.futures import ThreadPoolExecutor
 10from datetime import date
 11from threading import Lock
 12from typing import List
 13
 14from dotenv import load_dotenv
 15
 16from calendarific import CalendarifiyApi, Holiday
 17from wxc_sdk import WebexSimpleApi
 18from wxc_sdk.locations import Location
 19from wxc_sdk.all_types import ScheduleType, Event, Schedule
 20
 21log = logging.getLogger(__name__)
 22
 23# a lock per location to protect observe_in_location()
 24location_locks: dict[str, Lock] = defaultdict(Lock)
 25
 26# Use parallel threads for provisioning?
 27USE_THREADING = True
 28
 29# True: delete holiday schedule instead of creating one
 30CLEAN_UP = False
 31
 32# first and last year for which to create public holiday events
 33FIRST_YEAR = 2022
 34LAST_YEAR = 2024
 35
 36LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
 37
 38
 39def observe_in_location(*, api: WebexSimpleApi, location: Location, holidays: List[Holiday]):
 40    """
 41    create/update a "National Holiday" schedule in one location
 42
 43    :param api: Webex api
 44    :type api: WebexSimpleApi
 45    :param location: location to work on
 46    :type location: Location
 47    :param holidays: list of holidays to observe
 48    :type holidays: List[Holiday]
 49    """
 50    # there should always only one thread messing with the holiday schedule of a location
 51    with location_locks[location.location_id]:
 52        year = holidays[0].date.year
 53        schedule_name = 'National Holidays'
 54
 55        # shortcut
 56        ats = api.telephony.schedules
 57
 58        # existing "National Holiday" schedule or None
 59        schedule = next((schedule
 60                         for schedule in ats.list(obj_id=location.location_id,
 61                                                  schedule_type=ScheduleType.holidays,
 62                                                  name=schedule_name)
 63                         if schedule.name == schedule_name),
 64                        None)
 65        if CLEAN_UP:
 66            if schedule:
 67                log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
 68                ats.delete_schedule(obj_id=location.location_id,
 69                                    schedule_type=ScheduleType.holidays,
 70                                    schedule_id=schedule.schedule_id)
 71            return
 72        if schedule:
 73            # we need the details: list response doesn't have events
 74            schedule = ats.details(obj_id=location.location_id,
 75                                   schedule_type=ScheduleType.holidays,
 76                                   schedule_id=schedule.schedule_id)
 77        # create list of desired schedule entries
 78        #   * one per holiday
 79        #   * only future holidays
 80        #   * not on a Sunday
 81        today = date.today()
 82        events = [Event(name=f'{holiday.name} {holiday.date.year}',
 83                        start_date=holiday.date,
 84                        end_date=holiday.date,
 85                        all_day_enabled=True)
 86                  for holiday in holidays
 87                  if holiday.date >= today and holiday.date.weekday() != 6]
 88
 89        if not schedule:
 90            # create new schedule
 91            log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
 92            if not events:
 93                log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
 94                return
 95            schedule = Schedule(name=schedule_name,
 96                                schedule_type=ScheduleType.holidays,
 97                                events=events)
 98            log.debug(
 99                f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
100                f'events')
101            schedule_id = ats.create(obj_id=location.location_id, schedule=schedule)
102            log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
103            return
104
105        # update existing schedule
106        with ThreadPoolExecutor() as pool:
107            # delete existing events in the past
108            to_delete = [event
109                         for event in schedule.events
110                         if event.start_date < today]
111            if to_delete:
112                log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113                if USE_THREADING:
114                    list(pool.map(
115                        lambda event: ats.event_delete(obj_id=location.location_id,
116                                                       schedule_type=ScheduleType.holidays,
117                                                       schedule_id=schedule.schedule_id,
118                                                       event_id=event.event_id),
119                        to_delete))
120                else:
121                    for event in to_delete:
122                        ats.event_delete(obj_id=location.location_id,
123                                         schedule_type=ScheduleType.holidays,
124                                         schedule_id=schedule.schedule_id,
125                                         event_id=event.event_id)
126
127            # add events which don't exist yet
128            existing_dates = set(event.start_date
129                                 for event in schedule.events)
130            to_add = [event
131                      for event in events
132                      if event.start_date not in existing_dates]
133            if not to_add:
134                log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
135                return
136            log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
137            if USE_THREADING:
138                list(pool.map(
139                    lambda event: ats.event_create(
140                        obj_id=location.location_id,
141                        schedule_type=ScheduleType.holidays,
142                        schedule_id=schedule.schedule_id,
143                        event=event),
144                    to_add))
145            else:
146                for event in to_add:
147                    ats.event_create(
148                        obj_id=location.location_id,
149                        schedule_type=ScheduleType.holidays,
150                        schedule_id=schedule.schedule_id,
151                        event=event)
152        log.info(f'observe_in_location({location.name}, {year}): done.')
153    return
154
155
156def observe_national_holidays(*, api: WebexSimpleApi, locations: List[Location],
157                              year: int = None):
158    """
159    US national holidays for given locations
160
161    :param api: Webex api
162    :type api: WebexSimpleApi
163    :param locations: list of locations in which US national holidays should be observed
164    :type locations: List[Location]
165    :param year: year for national holidays. Default: current year
166    :type year: int
167    """
168    # default: this year
169    year = year or date.today().year
170
171    # get national holidays for specified year
172    holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
173
174    # update holiday schedule for each location
175    with ThreadPoolExecutor() as pool:
176        if USE_THREADING:
177            list(pool.map(
178                lambda location: observe_in_location(api=api, location=location, holidays=holidays),
179                locations))
180        else:
181            for location in locations:
182                observe_in_location(api=api, location=location, holidays=holidays)
183    return
184
185
186if __name__ == '__main__':
187    # read dotenv which has some environment variables like Webex API token and Calendarify
188    # API key.
189    load_dotenv()
190
191    # enable logging
192    logging.basicConfig(level=logging.DEBUG,
193                        format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
194    logging.getLogger('urllib3').setLevel(logging.INFO)
195    logging.getLogger('wxc_sdk.rest').setLevel(logging.INFO)
196
197    # the actual action
198    with WebexSimpleApi(concurrent_requests=5) as wx_api:
199        # get all US locations
200        log.info('Getting locations...')
201        us_locations = [location
202                        for location in wx_api.locations.list()
203                        if location.address.country == 'US']
204
205        # set up location locks
206        # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207        list(location_locks[loc.location_id] for loc in us_locations)
208
209        # create national holiday schedule for given year(s) and locations
210        if USE_THREADING:
211            with ThreadPoolExecutor() as pool:
212                list(pool.map(
213                    lambda year: observe_national_holidays(api=wx_api, year=year, locations=us_locations),
214                    range(FIRST_YEAR, LAST_YEAR + 1)))
215        else:
216            for year in range(FIRST_YEAR, LAST_YEAR + 1):
217                observe_national_holidays(api=wx_api, year=year, locations=us_locations)

Holiday schedule w/ US national holidays for all US locations (async variant)

This example uses the Calendarific API at https://calendarific.com/ to get a list of national US holidays and creates a “National Holidays” holiday schedule for all US locations with all these national holidays.

A rudimentary API implementation in calendarific.py is used for the requests to https://calendarific.com/. Calendarific APIs require all requests to be authenticated using an API key. You can sign up for a free account to get a free API account key which then is read from environment variable CALENDARIFIC_KEY.

Source: us_holidays_async.py

  1#!/usr/bin/env python
  2"""
  3Example script
  4Create a holiday schedule for all US locations with all national holidays
  5
  6Using the asyc SDK variant
  7"""
  8import asyncio
  9import functools
 10import logging
 11from collections import defaultdict
 12from datetime import date
 13from typing import List
 14
 15from dotenv import load_dotenv
 16
 17from calendarific import CalendarifiyApi, Holiday
 18from wxc_sdk.all_types import ScheduleType, Event, Schedule
 19from wxc_sdk.as_api import AsWebexSimpleApi
 20from wxc_sdk.locations import Location
 21
 22log = logging.getLogger(__name__)
 23
 24# a lock per location to protect observe_in_location()
 25location_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
 26
 27# Use parallel tasks for provisioning?
 28USE_TASKS = True
 29
 30# True: delete holiday schedule instead of creating one
 31CLEAN_UP = False
 32
 33# first and last year for which to create public holiday events
 34FIRST_YEAR = 2022
 35LAST_YEAR = 2024
 36
 37LAST_YEAR = not CLEAN_UP and LAST_YEAR or FIRST_YEAR
 38
 39
 40async def observe_in_location(*, api: AsWebexSimpleApi, location: Location, holidays: List[Holiday]):
 41    """
 42    create/update a "National Holiday" schedule in one location
 43
 44    :param api: Webex api
 45    :type api: WebexSimpleApi
 46    :param location: location to work on
 47    :type location: Location
 48    :param holidays: list of holidays to observe
 49    :type holidays: List[Holiday]
 50    """
 51    # there should always only one thread messing with the holiday schedule of a location
 52    async with location_locks[location.location_id]:
 53        year = holidays[0].date.year
 54        schedule_name = 'National Holidays'
 55
 56        # shortcut
 57        ats = api.telephony.schedules
 58
 59        # existing "National Holiday" schedule or None
 60        schedule = next((schedule
 61                         for schedule in await ats.list(obj_id=location.location_id,
 62                                                        schedule_type=ScheduleType.holidays,
 63                                                        name=schedule_name)
 64                         if schedule.name == schedule_name),
 65                        None)
 66        if CLEAN_UP:
 67            if schedule:
 68                log.info(f'Delete schedule {schedule.name} in location {schedule.location_name}')
 69                await ats.delete_schedule(obj_id=location.location_id,
 70                                          schedule_type=ScheduleType.holidays,
 71                                          schedule_id=schedule.schedule_id)
 72            return
 73        if schedule:
 74            # we need the details: list response doesn't have events
 75            schedule = await ats.details(obj_id=location.location_id,
 76                                         schedule_type=ScheduleType.holidays,
 77                                         schedule_id=schedule.schedule_id)
 78        # create list of desired schedule entries
 79        #   * one per holiday
 80        #   * only future holidays
 81        #   * not on a Sunday
 82        today = date.today()
 83        events = [Event(name=f'{holiday.name} {holiday.date.year}',
 84                        start_date=holiday.date,
 85                        end_date=holiday.date,
 86                        all_day_enabled=True)
 87                  for holiday in holidays
 88                  if holiday.date >= today and holiday.date.weekday() != 6]
 89
 90        if not schedule:
 91            # create new schedule
 92            log.debug(f'observe_in_location({location.name}, {year}): no existing schedule')
 93            if not events:
 94                log.info(f'observe_in_location({location.name}, {year}): no existing schedule, no events, done')
 95                return
 96            schedule = Schedule(name=schedule_name,
 97                                schedule_type=ScheduleType.holidays,
 98                                events=events)
 99            log.debug(
100                f'observe_in_location({location.name}, {year}): creating schedule "{schedule_name}" with {len(events)} '
101                f'events')
102            schedule_id = await ats.create(obj_id=location.location_id, schedule=schedule)
103            log.info(f'observe_in_location({location.name}, {year}): new schedule id: {schedule_id}, done')
104            return
105
106        # update existing schedule
107        # delete existing events in the past
108        to_delete = [event
109                     for event in schedule.events
110                     if event.start_date < today]
111        if to_delete:
112            log.debug(f'observe_in_location({location.name}, {year}): deleting {len(to_delete)} outdated events')
113            if USE_TASKS:
114                await asyncio.gather(*[ats.event_delete(obj_id=location.location_id,
115                                                        schedule_type=ScheduleType.holidays,
116                                                        schedule_id=schedule.schedule_id,
117                                                        event_id=event.event_id)
118                                       for event in to_delete])
119            else:
120                for event in to_delete:
121                    await ats.event_delete(obj_id=location.location_id,
122                                           schedule_type=ScheduleType.holidays,
123                                           schedule_id=schedule.schedule_id,
124                                           event_id=event.event_id)
125
126        # add events which don't exist yet
127        existing_dates = set(event.start_date
128                             for event in schedule.events)
129        to_add = [event
130                  for event in events
131                  if event.start_date not in existing_dates]
132        if not to_add:
133            log.info(f'observe_in_location({location.name}, {year}): no events to add, done.')
134            return
135        log.debug(f'observe_in_location({location.name}, {year}): creating {len(to_add)} new events.')
136        if USE_TASKS:
137            await asyncio.gather(*[ats.event_create(obj_id=location.location_id,
138                                                    schedule_type=ScheduleType.holidays,
139                                                    schedule_id=schedule.schedule_id,
140                                                    event=event)
141                                   for event in to_add])
142        else:
143            for event in to_add:
144                await ats.event_create(obj_id=location.location_id,
145                                       schedule_type=ScheduleType.holidays,
146                                       schedule_id=schedule.schedule_id,
147                                       event=event)
148        log.info(f'observe_in_location({location.name}, {year}): done.')
149    return
150
151
152async def observe_national_holidays(*, api: AsWebexSimpleApi, locations: List[Location],
153                                    year: int = None):
154    """
155    US national holidays for given locations
156
157    :param api: Webex api
158    :type api: WebexSimpleApi
159    :param locations: list of locations in which US national holidays should be observed
160    :type locations: List[Location]
161    :param year: year for national holidays. Default: current year
162    :type year: int
163    """
164    # default: this year
165    year = year or date.today().year
166
167    # get national holidays for specified year
168    loop = asyncio.get_running_loop()
169    # avoid sync all:
170    # holidays = CalendarifiyApi().holidays(country='US', year=year, holiday_type='national')
171    holidays = await loop.run_in_executor(None, functools.partial(CalendarifiyApi().holidays,
172                                                                  country='US', year=year, holiday_type='national'))
173
174    # update holiday schedule for each location
175    if USE_TASKS:
176        await asyncio.gather(*[observe_in_location(api=api, location=location, holidays=holidays)
177                               for location in locations])
178    else:
179        for location in locations:
180            await observe_in_location(api=api, location=location, holidays=holidays)
181    return
182
183
184if __name__ == '__main__':
185    # read dotenv which has some environment variables like Webex API token and Calendarify
186    # API key.
187    load_dotenv()
188
189    # enable logging
190    logging.basicConfig(level=logging.DEBUG,
191                        format='%(asctime)s %(levelname)s %(threadName)s %(name)s: %(message)s')
192    logging.getLogger('urllib3').setLevel(logging.INFO)
193    logging.getLogger('wxc_sdk.as_rest').setLevel(logging.INFO)
194
195    # the actual action
196    async def do_provision():
197
198        async with AsWebexSimpleApi(concurrent_requests=5) as wx_api:
199            # get all US locations
200            log.info('Getting locations...')
201            us_locations = [location
202                            for location in await wx_api.locations.list()
203                            if location.address.country == 'US']
204
205            # set up location locks
206            # location_locks is a defaultdict -> accessing with all potential keys creates the locks
207            list(location_locks[loc.location_id] for loc in us_locations)
208
209            # create national holiday schedule for given year(s) and locations
210            if USE_TASKS:
211                await asyncio.gather(*[observe_national_holidays(api=wx_api, year=year, locations=us_locations)
212                                       for year in range(FIRST_YEAR, LAST_YEAR + 1)])
213            else:
214                for year in range(FIRST_YEAR, LAST_YEAR + 1):
215                    await observe_national_holidays(api=wx_api, year=year, locations=us_locations)
216
217    asyncio.run(do_provision())

Persist tokens and obtain new tokens interactively

A typical problem specifically when creating CLI scripts is how to obtain valid access tokens for the API operations. If your code wants to invoke Webex REST APIs on behalf of a user then an integration is needed. The concepts of integrations are explained at the “Integrations” page on developer.cisco.com.

This example code shows how an OAUth Grant flow for an integration can be initiated from a script by using the Python webbrowser module and calling the open() method with the authorization URL of a given integration to open that URL in the system web browser. The user can then authenticate and grant access to the integration. In the last step of a successful authorization flow the web browser is redirected to the redirect_url of the integration.

The example code starts a primitive web server serving GET requests to http://localhost:6001/redirect. This URL has to be the redirect URL of the integration you create under My Webex Apps on developer.webex.com.

The sample script reads the integration parameters from environment variables (TOKEN_INTEGRATION_CLIENT_ID, TOKEN_INTEGRATION_CLIENT_SECRET, TOKEN_INTEGRATION_CLIENT_SCOPES). These variables can also be defined in get_tokens.env in the current directory:

# rename this to get_tokens.env and set values

# client ID for integration to be used.
TOKEN_INTEGRATION_CLIENT_ID=

# client secret of integration
TOKEN_INTEGRATION_CLIENT_SECRET=

# scopes to request. Use these scopes when creating the integration at developer.webex.com
# scopes can be in any form supported by wxc_sdk.scopes.parse_scopes()
TOKEN_INTEGRATION_CLIENT_SCOPES="spark:calls_write spark:kms spark:calls_read spark-admin:telephony_config_read spark:people_read"

The sample code persists the tokens in get_tokens.yml in the current directory. On startup the sample code tries to read tokens from that file. If needed a new access token is obtained using the refresh token.

An OAuth flow is only initiated if no (valid) tokens could be read from get_tokens.yml

Source: get_tokens.py

 1#!/usr/bin/env python
 2"""
 3Example script
 4read tokens from file or interactively obtain token by starting a local web server and open the authorization URL in
 5the local web browser
 6"""
 7import logging
 8import os
 9from typing import Optional
10
11from dotenv import load_dotenv
12
13from wxc_sdk import WebexSimpleApi
14from wxc_sdk.integration import Integration
15from wxc_sdk.scopes import parse_scopes
16from wxc_sdk.tokens import Tokens
17
18log = logging.getLogger(__name__)
19
20
21def env_path() -> str:
22    """
23    determine path for .env to load environment variables from
24
25    :return: .env file path
26    """
27    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
28
29
30def yml_path() -> str:
31    """
32    determine path of YML file to persist tokens
33
34    :return: path to YML file
35    :rtype: str
36    """
37    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
38
39
40def build_integration() -> Integration:
41    """
42    read integration parameters from environment variables and create an integration
43
44    :return: :class:`wxc_sdk.integration.Integration` instance
45    """
46    client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
47    client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
48    scopes = parse_scopes(os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES'))
49    redirect_url = 'http://localhost:6001/redirect'
50    if not all((client_id, client_secret, scopes)):
51        raise ValueError('failed to get integration parameters from environment')
52    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
53                       redirect_url=redirect_url)
54
55
56def get_tokens() -> Optional[Tokens]:
57    """
58
59    Tokens are read from a YML file. If needed an OAuth flow is initiated.
60
61    :return: tokens
62    :rtype: :class:`wxc_sdk.tokens.Tokens`
63    """
64
65    integration = build_integration()
66    tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
67    return tokens
68
69
70if __name__ == '__main__':
71    logging.basicConfig(level=logging.DEBUG)
72
73    # load environment variables from .env
74    path = env_path()
75    log.info(f'reading {path}')
76    load_dotenv(env_path())
77
78    tokens = get_tokens()
79
80    # use the tokens to get identity of authenticated user
81    api = WebexSimpleApi(tokens=tokens)
82    me = api.people.me()
83    print(f'authenticated as {me.display_name} ({me.emails[0]}) ')

Read/update call intercept settings of a user

usage: call_intercept.py [-h] [–token TOKEN] user_email [{on,off}]

positional arguments:
user_email email address of user
{on,off} operation to apply

options:
-h, –help show this help message and exit
–token TOKEN admin access token to use

The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables which can be set in a .env file

Source: call_intercept.py

  1#!/usr/bin/env python
  2"""
  3Script to read/update call intercept settings of a calling user.
  4
  5The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or
  6obtains tokens via an OAuth flow.
  7
  8    usage: call_intercept.py [-h] [--token TOKEN] user_email [{on,off}]
  9
 10    positional arguments:
 11      user_email     email address of user
 12      {on,off}       operation to apply
 13
 14    options:
 15      -h, --help     show this help message and exit
 16      --token TOKEN  admin access token to use
 17"""
 18import argparse
 19import logging
 20import os
 21import re
 22import sys
 23from json import loads
 24from typing import Optional
 25
 26from dotenv import load_dotenv
 27from wxc_sdk import WebexSimpleApi
 28from wxc_sdk.integration import Integration
 29from wxc_sdk.person_settings.call_intercept import InterceptSetting
 30from wxc_sdk.rest import RestError
 31from wxc_sdk.scopes import parse_scopes
 32from wxc_sdk.tokens import Tokens
 33from yaml import safe_dump, safe_load
 34
 35log = logging.getLogger(__name__)
 36
 37
 38def env_path() -> str:
 39    """
 40    determine path for .env to load environment variables from
 41
 42    :return: .env file path
 43    """
 44    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
 45
 46
 47def yml_path() -> str:
 48    """
 49    determine path of YML file to persist tokens
 50
 51    :return: path to YML file
 52    :rtype: str
 53    """
 54    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
 55
 56
 57def build_integration() -> Integration:
 58    """
 59    read integration parameters from environment variables and create an integration
 60
 61    :return: :class:`wxc_sdk.integration.Integration` instance
 62    """
 63    client_id = os.getenv('TOKEN_INTEGRATION_CLIENT_ID')
 64    client_secret = os.getenv('TOKEN_INTEGRATION_CLIENT_SECRET')
 65    scopes = os.getenv('TOKEN_INTEGRATION_CLIENT_SCOPES')
 66    if scopes:
 67        scopes = parse_scopes(scopes)
 68    if not all((client_id, client_secret, scopes)):
 69        raise ValueError('failed to get integration parameters from environment')
 70    redirect_url = 'http://localhost:6001/redirect'
 71    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
 72                       redirect_url=redirect_url)
 73
 74
 75def get_tokens() -> Optional[Tokens]:
 76    """
 77
 78    Tokens are read from a YML file. If needed an OAuth flow is initiated.
 79
 80    :return: tokens
 81    :rtype: :class:`wxc_sdk.tokens.Tokens`
 82    """
 83
 84    def write_tokens(tokens_to_cache: Tokens):
 85        with open(yml_path(), mode='w') as f:
 86            safe_dump(loads(tokens_to_cache.json()), f)
 87        return
 88
 89    def read_tokens() -> Optional[Tokens]:
 90        try:
 91            with open(yml_path(), mode='r') as f:
 92                data = safe_load(f)
 93                tokens_read = Tokens.model_validate(data)
 94        except Exception as e:
 95            log.info(f'failed to read tokens from file: {e}')
 96            tokens_read = None
 97        return tokens_read
 98
 99    integration = build_integration()
100    tokens = integration.get_cached_tokens(read_from_cache=read_tokens,
101                                           write_to_cache=write_tokens)
102    return tokens
103
104
105RE_EMAIL = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
106
107
108def email_type(value):
109    if not RE_EMAIL.match(value):
110        raise argparse.ArgumentTypeError(f"'{value}' is not a valid email")
111    return value
112
113
114def main():
115    """
116    where the magic happens
117    """
118    # read .env file with the settings for the integration to be used to obtain tokens
119    load_dotenv(env_path())
120
121    parser = argparse.ArgumentParser()
122    parser.add_argument('user_email', type=email_type, help='email address of user')
123    parser.add_argument('on_off', choices=['on', 'off'], nargs='?', help='operation to apply')
124    parser.add_argument('--token', type=str, required=False, help='admin access token to use')
125    args = parser.parse_args()
126
127    if args.token:
128        tokens = args.token
129    elif (tokens := os.getenv('WEBEX_ACCESS_TOKEN')) is None:
130        tokens = get_tokens()
131
132    if not tokens:
133        print('Failed to get tokens', file=sys.stderr)
134        exit(1)
135
136    # set level to DEBUG to see debug of REST requests
137    logging.basicConfig(level=(gt := getattr(sys, 'gettrace', None)) and gt() and logging.DEBUG or logging.INFO)
138
139    with WebexSimpleApi(tokens=tokens) as api:
140        # get user
141        email = args.user_email.lower()
142        user = next((user
143                     for user in api.people.list(email=email)
144                     if user.emails[0] == email), None)
145        if user is None:
146            print(f'User "{email}" not found', file=sys.stderr)
147            exit(1)
148
149        # display call intercept status
150        try:
151            intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
152        except RestError as e:
153            print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
154            exit(1)
155
156        print('on' if intercept.enabled else 'off')
157        if args.on_off:
158            # action: turn on/off
159            intercept = InterceptSetting.default()
160            intercept.enabled = args.on_off == 'on'
161            try:
162                api.person_settings.call_intercept.configure(person_id=user.person_id,
163                                                             intercept=intercept)
164            except RestError as e:
165                print(f'Failed to update call intercept settings: {e.response.status_code}, {e.description}')
166                exit(1)
167
168            # read call intercept again
169            try:
170                intercept = api.person_settings.call_intercept.read(person_id=user.person_id)
171            except RestError as e:
172                print(f'Failed to read call intercept settings: {e.response.status_code}, {e.description}')
173                exit(1)
174
175            # display state after update
176            print(f"set to {'on' if intercept.enabled else 'off'}")
177
178    exit(0)
179
180
181if __name__ == '__main__':
182    main()

Read/update call queue agent join states

usage: queue_helper.py [-h] [–location LOCATION [LOCATION …]]
[–queue QUEUE [QUEUE …]]
[–join JOIN_AGENT [JOIN_AGENT …]]
[–unjoin UNJOIN_AGENT [UNJOIN_AGENT …]]
[–remove REMOVE_USER [REMOVE_USER …]]
[–add ADD_USER [ADD_USER …]] [–dryrun]
[–token TOKEN]

Modify call queue settings from the CLI

optional arguments:
-h, –help show this help message and exit
–location LOCATION [LOCATION …], -l LOCATION [LOCATION …]
name of location to work on. If missing then work on
all locations.
–queue QUEUE [QUEUE …], -q QUEUE [QUEUE …]
name(s) of queue(s) to operate on. If missing then
work on all queues in location.
–join JOIN_AGENT [JOIN_AGENT …], -j JOIN_AGENT [JOIN_AGENT …]
Join given user(s) on given queue(s). Can be “all” to
act on all agents.
–unjoin UNJOIN_AGENT [UNJOIN_AGENT …], -u UNJOIN_AGENT [UNJOIN_AGENT …]
Unjoin given agent(s) from given queue(s). Can be
“all” to act on all agents.
–remove REMOVE_USER [REMOVE_USER …], -r REMOVE_USER [REMOVE_USER …]
Remove given agent from given queue(s). Can be “all”
to act on all agents.
–add ADD_USER [ADD_USER …], -a ADD_USER [ADD_USER …]
Add given users to given queue(s).
–dryrun, -d Dry run; don’t apply any changes
–token TOKEN admin access token to use

The script uses the access token passed via the CLI, reads one from the WEBEX_ACCESS_TOKEN environment variable or obtains tokens via an OAuth flow. For the last option the integration parameters are read from environment variables which can be set in a queue_helper.env file in the current directory.

Source: queue_helper.py

  1#!/usr/bin/env python
  2"""
  3usage: queue_helper.py [-h] [--location LOCATION [LOCATION ...]] [--queue QUEUE [QUEUE ...]]
  4                       [--join JOIN_AGENT [JOIN_AGENT ...]] [--unjoin UNJOIN_AGENT [UNJOIN_AGENT ...]]
  5                       [--remove REMOVE_USER [REMOVE_USER ...]] [--add ADD_USER [ADD_USER ...]] [--dryrun]
  6                       [--token TOKEN]
  7
  8Modify call queue settings from the CLI
  9
 10optional arguments:
 11  -h, --help            show this help message and exit
 12  --location LOCATION [LOCATION ...], -l LOCATION [LOCATION ...]
 13                        name of location to work on. If missing then work on all locations.
 14  --queue QUEUE [QUEUE ...], -q QUEUE [QUEUE ...]
 15                        name(s) of queue(s) to operate on. If missing then work on all queues in location.
 16  --join JOIN_AGENT [JOIN_AGENT ...], -j JOIN_AGENT [JOIN_AGENT ...]
 17                        Join given user(s) on given queue(s). Can be "all" to act on all agents.
 18  --unjoin UNJOIN_AGENT [UNJOIN_AGENT ...], -u UNJOIN_AGENT [UNJOIN_AGENT ...]
 19                        Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.
 20  --remove REMOVE_USER [REMOVE_USER ...], -r REMOVE_USER [REMOVE_USER ...]
 21                        Remove given agent from given queue(s). Can be "all" to act on all agents.
 22  --add ADD_USER [ADD_USER ...], -a ADD_USER [ADD_USER ...]
 23                        Add given users to given queue(s).
 24  --dryrun, -d          Dry run; don't apply any changes
 25  --token TOKEN         admin access token to use
 26"""
 27import asyncio
 28import logging
 29import os
 30import sys
 31from argparse import ArgumentParser
 32from collections.abc import AsyncGenerator
 33from typing import Optional
 34
 35from dotenv import load_dotenv
 36
 37from wxc_sdk import Tokens
 38from wxc_sdk.as_api import AsWebexSimpleApi
 39from wxc_sdk.common import UserType
 40from wxc_sdk.integration import Integration
 41from wxc_sdk.people import Person
 42from wxc_sdk.scopes import parse_scopes
 43from wxc_sdk.telephony.callqueue import CallQueue
 44from wxc_sdk.telephony.hg_and_cq import Agent
 45
 46
 47def agent_name(agent: Agent) -> str:
 48    return f'{agent.first_name} {agent.last_name}'
 49
 50
 51def env_path() -> str:
 52    """
 53    determine path for .env to load environment variables from; based on name of this file
 54    :return: .env file path
 55    """
 56    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
 57
 58
 59def yml_path() -> str:
 60    """
 61    determine path of YML file to persist tokens
 62    :return: path to YML file
 63    :rtype: str
 64    """
 65    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
 66
 67
 68def build_integration() -> Integration:
 69    """
 70    read integration parameters from environment variables and create an integration
 71    :return: :class:`wxc_sdk.integration.Integration` instance
 72    """
 73    client_id = os.getenv('INTEGRATION_CLIENT_ID')
 74    client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
 75    scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
 76    redirect_url = 'http://localhost:6001/redirect'
 77    if not all((client_id, client_secret, scopes)):
 78        raise ValueError('failed to get integration parameters from environment')
 79    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
 80                       redirect_url=redirect_url)
 81
 82
 83def get_tokens() -> Optional[Tokens]:
 84    """
 85    Tokens are read from a YML file. If needed an OAuth flow is initiated.
 86
 87    :return: tokens
 88    :rtype: :class:`wxc_sdk.tokens.Tokens`
 89    """
 90
 91    integration = build_integration()
 92    tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
 93    return tokens
 94
 95
 96async def main():
 97    async def act_on_queue(queue: CallQueue):
 98        """
 99        Act on a single queue
100        """
101        # we need the queue details b/c the queue instance passed as parameter is from a list() call
102        # ... and thus is missing all the details like agents
103        details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
104        agent_names = set(map(agent_name, details.agents))
105
106        def notify(message: str) -> str:
107            """
108            an action notification with queue information
109            """
110            return f'queue "{details.name:{queue_len}}" in "{queue.location_name:{location_len}}": {message}'
111
112        def validate_agents(names: list[str], operation: str) -> list[str]:
113            """
114            check if all names in given list exist as agents on current queue
115            """
116            if 'all' in names:
117                return set(agent_names)
118
119            not_found = [name for name in names if name not in agent_names]
120            if not_found:
121                print('\n'.join(notify(f'{name} not found for {operation}"')
122                                for name in not_found),
123                      file=sys.stderr)
124            return set(name for name in names if name not in set(not_found))
125
126        # validate list of names or join, unjoin, and remove against actual list of agents
127        to_join = validate_agents(join_agents, 'join')
128        to_unjoin = validate_agents(unjoin_agents, 'unjoin')
129        to_remove = validate_agents(remove_users, 'remove')
130
131        # check for agents we are asked to add but which already exist as agents on the queue
132        existing_agent_ids = set(agent.agent_id for agent in details.agents)
133        agent_exists = [agent_name(user) for user in add_users
134                        if user.person_id in existing_agent_ids]
135        if agent_exists:
136            print('\n'.join(notify(f'{name} already is agent')
137                            for name in agent_exists),
138                  file=sys.stderr)
139            # reduced set of users to add
140            to_add = [user for user in add_users
141                      if agent_name(user) not in set(agent_exists)]
142        else:
143            # ...  or add all users
144            to_add = add_users
145
146        # the updated list of agents for the current queue
147        new_agents = []
148
149        # do we actually need an update?
150        update_needed = False
151
152        # create copy of each agent instance; we don't want to update the original agent objects
153        # to make sure that details still holds the state before any update
154        agents = [agent.copy(deep=True) for agent in details.agents]
155
156        # iterate through the existing agents and see if we have to apply any change
157        for agent in agents:
158            name = agent_name(agent)
159            # do we have to take action to join this agent?
160            if name in to_join and not agent.join_enabled:
161                print(notify(f'{name}, join'))
162                update_needed = True
163                agent.join_enabled = True
164            # do we have to take action to unjoin this agent?
165            if name in to_unjoin and agent.join_enabled:
166                print(notify(f'{name}, unjoin'))
167                update_needed = True
168                agent.join_enabled = False
169            # do we have to remove this agent?
170            if name in to_remove:
171                print(notify(f'{name}, remove'))
172                update_needed = True
173                # skip to next agent; so that we don't add this agent to the updated list of agents
174                continue
175            new_agents.append(agent)
176
177        # add new agents
178        new_agents.extend(Agent(agent_id=user.person_id, user_type=UserType.people)
179                          for user in to_add)
180
181        # update the queue
182        if (update_needed or to_add) and not args.dryrun:
183            # simplified update: we only messed with the agents
184            update = CallQueue(agents=new_agents)
185            await api.telephony.callqueue.update(location_id=queue.location_id, queue_id=queue.id,
186                                                 update=update)
187            print(notify('queue updated'))
188            # and get details after the update
189            details = await api.telephony.callqueue.details(location_id=queue.location_id, queue_id=queue.id)
190            print(notify('got details after update'))
191
192        # print summary
193        print(f'queue "{queue.name:{queue_len}}" in "{queue.location_name}"')
194        print(f'  phone number: {details.phone_number}')
195        print(f'  extension: {details.extension}')
196        print('  agents')
197        if details.agents:
198            name_len = max(map(len, map(agent_name, details.agents)))
199            for agent in details.agents:
200                print(f'    {agent_name(agent):{name_len}}: {"not " if not agent.join_enabled else ""}joined')
201        return
202
203    async def validate_users(user_names: list[str]) -> AsyncGenerator[Person, None, None]:
204        """
205        Validate list of names of users to be added and yield a Person instance for each one
206        """
207        # search for all names in parallel
208        lists: list[list[Person]] = await asyncio.gather(
209            *[api.people.list(display_name=name) for name in user_names], return_exceptions=True)
210        for name, user_list in zip(user_names, lists):
211            if isinstance(user_list, Exception):
212                user = None
213            else:
214                user = next((u for u in user_list if name == agent_name(u)), None)
215            if user is None:
216                print(f'user "{name}" not found', file=sys.stderr)
217                continue
218            yield user
219        return
220
221    # parse command line
222    parser = ArgumentParser(description='Modify call queue settings from the CLI')
223    parser.add_argument('--location', '-l', type=str, required=False, nargs='+',
224                        help='name of location to work on. If missing then work on all locations.')
225
226    parser.add_argument('--queue', '-q', type=str, required=False, nargs='+',
227                        help='name(s) of queue(s) to operate on. If missing then work on all queues in location.')
228
229    parser.add_argument('--join', '-j', type=str, required=False, nargs='+', dest='join_agent',
230                        help='Join given user(s) on given queue(s). Can be "all" to act on all agents.')
231
232    parser.add_argument('--unjoin', '-u', type=str, required=False, nargs='+', dest='unjoin_agent',
233                        help='Unjoin given agent(s) from given queue(s). Can be "all" to act on all agents.')
234
235    parser.add_argument('--remove', '-r', type=str, required=False, nargs='+', dest='remove_user',
236                        help='Remove given agent from given queue(s). Can be "all" to act on all agents.')
237
238    parser.add_argument('--add', '-a', type=str, required=False, nargs='+', dest='add_user',
239                        help='Add given users to given queue(s).')
240    parser.add_argument('--dryrun', '-d', required=False, action='store_true',
241                        help='Dry run; don\'t apply any changes')
242    parser.add_argument('--token', type=str, required=False, help='admin access token to use')
243
244    args = parser.parse_args()
245
246    # get environment variables from .env; required for integration parameters
247    load_dotenv(env_path())
248
249    tokens = args.token or None
250    if tokens is None:
251        # get tokens from cache or create a new set of tokens using the integration defined in .env
252        tokens = get_tokens()
253
254    async with AsWebexSimpleApi(tokens=tokens) as api:
255        # validate location parameter
256        location_names = args.location or []
257
258        # list of all locations with names matching one of the provided names
259        locations = [loc for loc in await api.locations.list()
260                     if not location_names or loc.name in set(location_names)]
261
262        if not location_names:
263            print(f'Considering all {len(locations)} locations')
264
265        # set of names of matching locations
266        found_location_names = set(loc.name for loc in locations)
267
268        # Error message for each location name argument not matching an actual location
269        for location_name in location_names:
270            if location_name not in found_location_names:
271                print(f'location "{location_name}" not found', file=sys.stderr)
272
273        if not locations:
274            print('Found no locations to work on', file=sys.stderr)
275            exit(1)
276
277        # which queues do we need to operate on?
278        location_ids = set(loc.location_id for loc in locations)
279        queue_names = args.queue
280        all_queues = queue_names is None
281        # full list of queues
282        queues = await api.telephony.callqueue.list()
283        # filter based on location parameter
284        queues = [queue for queue in queues
285                  if (all_queues or queue.name in queue_names) and queue.location_id in location_ids]
286
287        # len of queue names for nicer output
288        queue_len = max(len(queue.name) for queue in queues)
289
290        # now we can actually go back and re-evaluate the list of locations; for the location length we only need
291        # to consider locations we actually have a target queue in
292        location_ids = set(queue.location_id for queue in queues)
293
294        # max length of location names for nicely formatted output
295        location_len = max(len(loc.name)
296                           for loc in locations
297                           if loc.location_id in location_ids)
298
299        # get the names for join, unjoin, remove, and add
300        join_agents = args.join_agent or []
301        unjoin_agents = args.unjoin_agent or []
302        remove_users = args.remove_user or []
303        add_users = args.add_user or []
304
305        # validate users; make sure that users exist with the provided names
306        add_users = [u async for u in validate_users(user_names=add_users)]
307
308        # apply actions to all queues
309        await asyncio.gather(*[act_on_queue(queue) for queue in queues])
310
311
312if __name__ == '__main__':
313    # enable DEBUG logging to a file; REST log shows all requests
314    logging.basicConfig(filename=os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.log'),
315                        filemode='w', level=logging.DEBUG)
316    asyncio.run(main())

Using service APP tokens to access a API endpoints

The script uses service app credentials to get an access token and then use this access token to call Webex Calling APIs.

Source: service_app.py

  1#!/usr/bin/env python
  2"""
  3Demo for Webex service app: using service APP tokens to access a API endpoints
  4"""
  5import logging
  6import sys
  7from json import dumps, loads
  8from os import getenv
  9from os.path import basename, splitext, isfile
 10from typing import Optional
 11
 12from dotenv import load_dotenv
 13from yaml import safe_load, safe_dump
 14
 15from wxc_sdk import WebexSimpleApi
 16from wxc_sdk.integration import Integration
 17from wxc_sdk.tokens import Tokens
 18
 19
 20def yml_path() -> str:
 21    """
 22    Get filename for YML file to cache access and refresh token
 23    """
 24    return f'{splitext(basename(__file__))[0]}.yml'
 25
 26
 27def env_path() -> str:
 28    """
 29    Get path to .env file to read service app settings from
 30    :return:
 31    """
 32    return f'{splitext(basename(__file__))[0]}.env'
 33
 34
 35def read_tokens_from_file() -> Optional[Tokens]:
 36    """
 37    Get service app tokens from cache file, return None if cache does not exist
 38    """
 39    path = yml_path()
 40    if not isfile(path):
 41        return None
 42    try:
 43        with open(path, mode='r') as f:
 44            data = safe_load(f)
 45        tokens = Tokens.model_validate(data)
 46    except Exception:
 47        return None
 48    return tokens
 49
 50
 51def write_tokens_to_file(tokens: Tokens):
 52    """
 53    Write tokens to cache
 54    """
 55    with open(yml_path(), mode='w') as f:
 56        safe_dump(tokens.model_dump(exclude_none=True), f)
 57
 58
 59def get_access_token() -> Tokens:
 60    """
 61    Get a new access token using refresh token, service app client id, service app client secret
 62    """
 63    tokens = Tokens(refresh_token=getenv('SERVICE_APP_REFRESH_TOKEN'))
 64    integration = Integration(client_id=getenv('SERVICE_APP_CLIENT_ID'),
 65                              client_secret=getenv('SERVICE_APP_CLIENT_SECRET'),
 66                              scopes=[], redirect_url=None)
 67    integration.refresh(tokens=tokens)
 68    write_tokens_to_file(tokens)
 69    return tokens
 70
 71
 72def get_tokens() -> Optional[Tokens]:
 73    """
 74    Get tokens from cache or create new access token using service app credentials
 75    """
 76    # try to read from file
 77    tokens = read_tokens_from_file()
 78    # .. or create new access token using refresh token
 79    if tokens is None:
 80        tokens = get_access_token()
 81    if tokens.remaining < 24 * 60 * 60:
 82        tokens = get_access_token()
 83    return tokens
 84
 85
 86def service_app():
 87    """
 88    Use service app access token to call Webex Calling API endpoints
 89    :return:
 90    """
 91    load_dotenv(env_path())
 92    # assert that all required environment variable are set
 93    if not all(getenv(s) for s in ('SERVICE_APP_REFRESH_TOKEN', 'SERVICE_APP_CLIENT_ID', 'SERVICE_APP_CLIENT_SECRET')):
 94        print(
 95            f'SERVICE_APP_REFRESH_TOKEN, SERVICE_APP_CLIENT_ID, and SERVICE_APP_CLIENT_SECRET need to be defined in '
 96            f'environment or in "{env_path()}"',
 97            file=sys.stderr)
 98        exit(1)
 99
100    # get tokens and dump to console
101    tokens = get_tokens()
102    print(dumps(loads(tokens.json()), indent=2))
103    print()
104    print('scopes:')
105    print('\n'.join(f' * {s}' for s in sorted(tokens.scope.split())))
106
107    # use tokens to access APIs
108    api = WebexSimpleApi(tokens=tokens)
109
110    users = list(api.people.list())
111    print(f'{len(users)} users')
112
113    queues = list(api.telephony.callqueue.list())
114    print(f'{len(queues)} call queues')
115
116
117if __name__ == '__main__':
118    logging.basicConfig(level=logging.DEBUG)
119    service_app()

Pool unassigned TNs on hunt groups to catch calls to unassigned TNs

This script looks for unassigned TNs and assigns them to HGs that are forwarded to the locations main number. The idea is to catch all incoming calls to unassigned TNs and handle them accordingly.

usage: catch_tns.py [-h] [–test] [–location LOCATION] [–token TOKEN]
[–cleanup]

optional arguments:
-h, –help show this help message and exit
–test test only; don’t actually apply any config
–location LOCATION Location to work on
–token TOKEN admin access token to use.
–cleanup remove all pooling HGs

Source: catch_tns.py

  1#!/usr/bin/env python
  2"""
  3This script looks for unassigned TNs and assigns them to HGs that are forwarded to the locations main number.
  4The idea is to catch all incoming calls to unassigned TNs and handle them accordingly
  5
  6    usage: catch_tns.py [-h] [--test] [--location LOCATION] [--token TOKEN]
  7                        [--cleanup]
  8
  9    optional arguments:
 10      -h, --help           show this help message and exit
 11      --test               test only; don't actually apply any config
 12      --location LOCATION  Location to work on
 13      --token TOKEN        admin access token to use.
 14      --cleanup            remove all pooling HGs
 15"""
 16import asyncio
 17import logging
 18import os
 19import sys
 20from argparse import ArgumentParser, Namespace
 21from collections import defaultdict
 22from operator import attrgetter
 23from typing import Optional, Union
 24
 25from dotenv import load_dotenv
 26
 27from wxc_sdk.as_api import AsWebexSimpleApi
 28from wxc_sdk.common import IdAndName, AlternateNumber, RingPattern
 29from wxc_sdk.integration import Integration
 30from wxc_sdk.scopes import parse_scopes
 31from wxc_sdk.telephony import NumberListPhoneNumber
 32from wxc_sdk.telephony.forwarding import CallForwarding
 33from wxc_sdk.telephony.huntgroup import HuntGroup
 34from wxc_sdk.tokens import Tokens
 35
 36POOL_HG_NAME = 'POOL_'
 37
 38
 39def env_path() -> str:
 40    """
 41    determine path for .env to load environment variables from; based on name of this file
 42    :return: .env file path
 43    """
 44    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.env')
 45
 46
 47def yml_path() -> str:
 48    """
 49    determine path of YML file to persist tokens
 50    :return: path to YML file
 51    :rtype: str
 52    """
 53    return os.path.join(os.getcwd(), f'{os.path.splitext(os.path.basename(__file__))[0]}.yml')
 54
 55
 56def build_integration() -> Integration:
 57    """
 58    read integration parameters from environment variables and create an integration
 59    :return: :class:`wxc_sdk.integration.Integration` instance
 60    """
 61    client_id = os.getenv('INTEGRATION_CLIENT_ID')
 62    client_secret = os.getenv('INTEGRATION_CLIENT_SECRET')
 63    scopes = parse_scopes(os.getenv('INTEGRATION_SCOPES'))
 64    redirect_url = 'http://localhost:6001/redirect'
 65    if not all((client_id, client_secret, scopes)):
 66        raise ValueError('failed to get integration parameters from environment')
 67    return Integration(client_id=client_id, client_secret=client_secret, scopes=scopes,
 68                       redirect_url=redirect_url)
 69
 70
 71def get_tokens() -> Optional[Tokens]:
 72    """
 73    Tokens are read from a YML file. If needed an OAuth flow is initiated.
 74
 75    :return: tokens
 76    :rtype: :class:`wxc_sdk.tokens.Tokens`
 77    """
 78
 79    integration = build_integration()
 80    tokens = integration.get_cached_tokens_from_yml(yml_path=yml_path())
 81    return tokens
 82
 83
 84async def location_id_from_args(api: AsWebexSimpleApi, args: Namespace) -> Optional[str]:
 85    """
 86    Get location id for --location parameter
 87    """
 88    if not args.location:
 89        return None
 90    location = next((loc
 91                     for loc in await api.locations.list(name=args.location)
 92                     if loc.name == args.location),
 93                    None)
 94    return location and location.location_id
 95
 96
 97async def cleanup(api: AsWebexSimpleApi, args: Namespace):
 98    """
 99    clean up (delete) pooling HGs
100    """
101    location_id = await location_id_from_args(api, args)
102    existing_hg_list = [hg for hg in await api.telephony.huntgroup.list(location_id=location_id)
103                        if hg.name.startswith(POOL_HG_NAME)]
104
105    async def delete_one(hg: HuntGroup):
106        if not args.test:
107            await api.telephony.huntgroup.delete_huntgroup(location_id=hg.location_id,
108                                                           huntgroup_id=hg.id)
109        print(f'Deleted HG "{hg.name}" in location "{hg.location_name}"')
110
111    await asyncio.gather(*[delete_one(hg)
112                           for hg in existing_hg_list])
113
114
115async def pool_tns_location(api: AsWebexSimpleApi, args: Namespace, location: IdAndName,
116                            tns: list[NumberListPhoneNumber]):
117    """
118    Pool unassigned TNs for a given location
119    """
120
121    async def add_to_hg(hg: Union[HuntGroup, str],
122                        tns: list[NumberListPhoneNumber]):
123        """
124        Add a bunch of TNs to given HG
125        :param hg: can be an existing HG or a name of a HG to be created
126        :param tns: list of TNs to be added to the HG
127        """
128        if isinstance(hg, str):
129            # create new HG
130            print(f'Creating HG "{hg}" in location "{location.name}" for: '
131                  f'{", ".join(tn.phone_number for tn in tns)}')
132            if args.test:
133                return
134            settings = HuntGroup.create(name=hg,
135                                        phone_number=tns[0].phone_number)
136            new_id = await api.telephony.huntgroup.create(location_id=location.id,
137                                                          settings=settings)
138            # Get details of new HG and also the forwarding settings
139            # * details are needed for the recursive call to add_to_hg .. in case we have more than one TN to add
140            # * ... and forwarding settings are needed b/c we want to set CFwdAll to location's main number
141            details, forwarding = await asyncio.gather(
142                api.telephony.huntgroup.details(location_id=location.id, huntgroup_id=new_id),
143                api.telephony.huntgroup.forwarding.settings(location_id=location.id, feature_id=new_id))
144            forwarding: CallForwarding
145            details: HuntGroup
146
147            # set call forwarding
148            print(f'HG "{hg}" in location "{location.name}": set CFwdAll to {main_number}')
149            forwarding.always.enabled = True
150            forwarding.always.destination = main_number
151            await api.telephony.huntgroup.forwarding.update(location_id=location.id, feature_id=new_id,
152                                                            forwarding=forwarding)
153            if len(tns) > 1:
154                # add the remaining as alternate numbers
155                await add_to_hg(hg=details, tns=tns[1:])
156            return
157
158        # add tns as alternate numbers
159        print(f'HG "{hg.name}" in location "{location.name}", adding: '
160              f'{", ".join(tn.phone_number for tn in tns)}')
161        if args.test:
162            return
163        # weirdly on GET alternate numbers are returned in alternate_number_settings ...
164        alternate_numbers = hg.alternate_number_settings.alternate_numbers
165        alternate_numbers.extend(AlternateNumber(phone_number=tn.phone_number,
166                                                 ring_pattern=RingPattern.normal)
167                                 for tn in tns)
168        # ... while for an update Wx expects the alternate numbers in an alternate_number attribute
169        update = HuntGroup(alternate_numbers=alternate_numbers)
170        await api.telephony.huntgroup.update(location_id=location.id,
171                                             huntgroup_id=hg.id,
172                                             update=update)
173
174        return
175
176    # if the list of available TNs includes the main number then something is wonky. The main number should be owned
177    # by something
178    main_number = next((tn for tn in tns if tn.main_number), None)
179    if main_number is not None:
180        print(f'Error: main number {main_number.phone_number} in location "{location.name}" is not assigned to '
181              f'anything!', file=sys.stderr)
182        return
183
184    # get main number of location
185    main_number = (await api.telephony.location.details(location_id=location.id)).calling_line_id.phone_number
186
187    # get list if "pool" HGs in location
188    existing_hg_list = [hg for hg in await api.telephony.huntgroup.list(location_id=location.id)
189                        if hg.name.startswith(POOL_HG_NAME)]
190
191    # we need the details for all HGs: list() response is missing the alternate number list
192    # get all HG details in parallel
193    existing_hg_list = await asyncio.gather(*[api.telephony.huntgroup.details(location_id=location.id,
194                                                                              huntgroup_id=hg.id)
195                                              for hg in existing_hg_list])
196    existing_hg_list: list[HuntGroup]
197
198    # start with an empty list of tasks
199    tasks = []
200
201    # assign TNs to existing "pool" HGs
202
203    # these are the HGs we can still assign some TNs to
204    hgs_with_open_slots = (hg
205                           for hg in existing_hg_list
206                           if len(hg.alternate_number_settings.alternate_numbers) < 10)
207    tns.sort(key=attrgetter('phone_number'))
208    while tns:
209        hg_with_open_slots = next(hgs_with_open_slots, None)
210        if hg_with_open_slots is None:
211            # no more HGs we can assign TNs to --> we are done here
212            break
213
214        # add some tns to this hg; hg can have max 10 alternate numbers
215        tns_to_add = 10 - len(hg_with_open_slots.alternate_number_settings.alternate_numbers)
216        tasks.append(add_to_hg(hg=hg_with_open_slots, tns=tns[:tns_to_add]))
217
218        # continue w/ remaining TNs
219        tns = tns[tns_to_add:]
220
221    # assign remaining TNs to new "pool" HGs
222    # .. in batches of 11
223    existing_names = set(hg.name for hg in existing_hg_list)
224    new_hg_names = (name
225                    for i in range(1, 1000)
226                    if (name := f'{POOL_HG_NAME}{i:03d}') not in existing_names)
227    while tns:
228        tasks.append(add_to_hg(hg=next(new_hg_names), tns=tns[:11]))
229        tns = tns[11:]
230
231    # Now run all tasks
232    await asyncio.gather(*tasks)
233    return
234
235
236async def pool_tns(api: AsWebexSimpleApi, args: Namespace):
237    """
238    Assign unassigned numbers to pool HGs
239    """
240    location_id = await location_id_from_args(api, args)
241
242    # get available TNs. If a location argument was present then limit to that location
243    numbers = await api.telephony.phone_numbers(available=True, location_id=location_id)
244
245    # we need to work on TNs by location ...
246    tns_by_location: dict[str, list[NumberListPhoneNumber]] = defaultdict(list)
247    # ... and we want to collect location information (specifically the name)
248    locations: dict[str, IdAndName] = dict()
249    for tn in numbers:
250        locations[tn.location.id] = tn.location
251        tns_by_location[tn.location.id].append(tn)
252
253    # work on all locations in parallel
254    await asyncio.gather(*[pool_tns_location(api=api, args=args,
255                                             location=locations[location_id], tns=tns)
256                           for location_id, tns in tns_by_location.items()])
257
258
259async def catch_tns():
260    """
261    Main async logic
262    """
263    parser = ArgumentParser()
264    parser.add_argument('--test', required=False, help='test only; don\'t actually apply any config',
265                        action='store_true')
266    parser.add_argument('--location', required=False, help='Location to work on', type=str)
267    parser.add_argument('--token', type=str, required=False, help='admin access token to use.')
268    parser.add_argument('--cleanup', required=False, help='remove all pooling HGs', action='store_true')
269    args = parser.parse_args()
270
271    load_dotenv(env_path())
272
273    tokens = args.token or None
274
275    if tokens is None:
276        # get tokens from cache or create a new set of tokens using the integration defined in .env
277        tokens = get_tokens()
278    async with AsWebexSimpleApi(tokens=tokens) as api:
279        if args.cleanup:
280            await cleanup(api, args)
281        else:
282            await pool_tns(api, args)
283
284
285if __name__ == '__main__':
286    logging.basicConfig(level=logging.INFO)
287    asyncio.run(catch_tns())

Downgrade room device workspaces from Webex Calling to free calling

This script looks for workspaces in a given location (or all workspaces) and downgrades them from Webex Calling to free calling.

usage: room_devices.py [-h] [–location LOCATION] [–wsnames WSNAMES] [–test] {show,clear}

CLI tool to manage room device calling entitlements

positional arguments:
{show,clear} show: show all room devices with their calling settings, clear: remove calling
license from devices

optional arguments:
-h, –help show this help message and exit
–location LOCATION work on devices in given location
–wsnames WSNAMES file name of a file with workspace names to operate on; one name per line
–test test run only

Source: room_devices.py

  1#!/usr/bin/env python
  2"""
  3    usage: room_devices.py [-h] [--location LOCATION] [--wsnames WSNAMES] [--test] {show,clear}
  4
  5    CLI tool to manage room device calling entitlements
  6
  7    positional arguments:
  8      {show,clear}         show: show all room devices with their calling settings, clear: remove calling
  9                           license from devices
 10
 11    optional arguments:
 12      -h, --help           show this help message and exit
 13      --location LOCATION  work on devices in given location
 14      --wsnames WSNAMES    file name of a file with workspace names to operate on; one name per line
 15      --test               test run only
 16
 17    The tool reads environment variables from room_devices.env:
 18        SERVICE_APP_CLIENT_ID=<clients id of a service app created on developer.webex.com>
 19        SERVICE_APP_CLIENT_SECRET=<clients secret of a service app created on developer.webex.com>
 20        SERVICE_APP_REFRESH_TOKEN=<refresh token of the service app obtained after the service app has been
 21                                        authorized for an org>
 22
 23    This information is used to obtain an access token required to authorize API access
 24                                        
 25    This is a super-set of the scopes the service app needs: 
 26        * spark-admin:workspaces_write
 27        * Identity:one_time_password
 28        * identity:placeonetimepassword_create
 29        * spark:people_read
 30        * spark-admin:workspace_locations_read
 31        * spark-admin:workspaces_read
 32        * spark:devices_write
 33        * spark:devices_read
 34        * spark:kms
 35        * spark-admin:devices_read
 36        * spark-admin:workspace_locations_write
 37        * spark-admin:licenses_read
 38        * spark-admin:telephony_config_read
 39        * spark-admin:telephony_config_write
 40        * spark-admin:devices_write
 41        * spark-admin:people_read
 42
 43    More service app details: https://developer.webex.com/docs/service-apps
 44
 45    Tokens get persisted in room_devices.yml.
 46"""
 47import asyncio
 48import logging
 49import sys
 50import time
 51from argparse import ArgumentParser
 52from collections import defaultdict
 53from functools import reduce
 54from itertools import chain
 55from os import getenv, getcwd
 56from os.path import splitext, basename, isfile, join
 57from typing import Optional
 58
 59from dotenv import load_dotenv
 60from yaml import safe_load, safe_dump
 61
 62from wxc_sdk.as_api import AsWebexSimpleApi
 63from wxc_sdk.base import webex_id_to_uuid
 64from wxc_sdk.common import OwnerType
 65from wxc_sdk.devices import Device
 66from wxc_sdk.integration import Integration
 67from wxc_sdk.locations import Location
 68from wxc_sdk.telephony import NumberListPhoneNumber
 69from wxc_sdk.tokens import Tokens
 70from wxc_sdk.workspace_locations import WorkspaceLocation
 71from wxc_sdk.workspaces import Workspace, WorkspaceSupportedDevices, CallingType, WorkspaceCalling
 72
 73
 74def yml_path() -> str:
 75    """
 76    Get filename for YML file to cache access and refresh token
 77    """
 78    return f'{splitext(basename(__file__))[0]}.yml'
 79
 80
 81def env_path() -> str:
 82    """
 83    Get path to .env file to read service app settings from
 84    """
 85    return f'{splitext(basename(__file__))[0]}.env'
 86
 87
 88def read_tokens_from_file() -> Optional[Tokens]:
 89    """
 90    Get service app tokens from cache file, return None if cache does not exist or read fails
 91    """
 92    path = yml_path()
 93    if not isfile(path):
 94        return None
 95    try:
 96        with open(path, mode='r') as f:
 97            data = safe_load(f)
 98        tokens = Tokens.model_validate(data)
 99    except Exception:
100        return None
101    return tokens
102
103
104def write_tokens_to_file(tokens: Tokens):
105    """
106    Write tokens to cache
107    """
108    with open(yml_path(), mode='w') as f:
109        safe_dump(tokens.model_dump(exclude_none=True), f)
110
111
112def get_access_token() -> Tokens:
113    """
114    Get a new access token using refresh token, service app client id, service app client secret
115    """
116    tokens = Tokens(refresh_token=getenv('SERVICE_APP_REFRESH_TOKEN'))
117    integration = Integration(client_id=getenv('SERVICE_APP_CLIENT_ID'),
118                              client_secret=getenv('SERVICE_APP_CLIENT_SECRET'),
119                              scopes=[], redirect_url=None)
120    integration.refresh(tokens=tokens)
121    write_tokens_to_file(tokens)
122    return tokens
123
124
125def get_tokens() -> Optional[Tokens]:
126    """
127    Get tokens from cache or create new access token using service app credentials
128    """
129    # try to read from file
130    tokens = read_tokens_from_file()
131    # .. or create new access token using refresh token
132    if tokens is None:
133        tokens = get_access_token()
134    if tokens.remaining < 24 * 60 * 60:
135        tokens = get_access_token()
136    return tokens
137
138
139def main() -> int:
140    """
141    Main code
142    """
143    # parse args
144    parser = ArgumentParser(prog=basename(__file__), description='CLI tool to manage room device calling entitlements')
145    parser.add_argument('operation', choices=['show', 'clear'], help='show: show all room devices with their calling '
146                                                                     'settings, clear: remove calling license from '
147                                                                     'devices')
148    parser.add_argument('--location', type=str, help='work on devices in given location')
149    parser.add_argument('--wsnames', type=str, help='file name of a file with workspace names to operate on; '
150                                                    'one name per line')
151    parser.add_argument('--test', action='store_true', help='test run only')
152    args = parser.parse_args()
153    operation = args.operation
154    test_run = args.test
155    location = args.location
156    ws_names = args.wsnames
157
158    # get tokens; as an alternative you can just get a developer token from developer.webex.com and use:
159    #   tokens = '<developer token from developer.webex.com>'
160    load_dotenv(dotenv_path=env_path())
161    err = ''
162    tokens = None
163    try:
164        tokens = get_tokens()
165    except Exception as e:
166        err = f'{e}'
167    if not tokens:
168        print(f'failed to obtain access tokens: {err}', file=sys.stderr)
169        return 1
170
171    async def as_main() -> int:
172        """
173        Async main to be able to use concurrency
174        """
175
176        async def downgrade_workspace(ws: Workspace):
177            """
178            Downgrade one workspace to free calling
179            """
180
181            def log(s: str, file=sys.stdout):
182                print(f'downgrade workspace "{ws.display_name:{ws_name_len}}": {s}', file=file)
183
184            if ws.calling.type != CallingType.webex:
185                raise ValueError(f'calling type is "{ws.calling.type}", not "{CallingType.webex.value}"')
186            if test_run:
187                log('skipping update, test run only')
188            else:
189                log('updating calling settings')
190                update = ws.model_copy(deep=True)
191                update.calling = WorkspaceCalling(type=CallingType.free)
192                await api.workspaces.update(workspace_id=ws.workspace_id, settings=update)
193            log('done')
194
195        async with AsWebexSimpleApi(tokens=tokens) as api:
196
197            # get list of locations and workspace locations
198            ws_location_list, location_list = await asyncio.gather(
199                api.workspace_locations.list(display_name=location),
200                api.locations.list(name=location))
201            location_list: list[Location]
202            ws_location_list: list[WorkspaceLocation]
203
204            # validate location argument
205            if location:
206                target_location = next((loc for loc in location_list if loc.name == location), None)
207                target_ws_location = next((loc for loc in ws_location_list if loc.display_name == location), None)
208                if not all((target_ws_location, target_location)):
209                    print(f'location "{location}" not found', file=sys.stderr)
210                    return 1
211            else:
212                target_location = None
213                target_ws_location = None
214
215            # get workspaces, numbers, and devices (in target location)
216            workspaces, numbers, devices = await asyncio.gather(
217                api.workspaces.list(workspace_location_id=target_ws_location and target_ws_location.id),
218                api.telephony.phone_numbers(location_id=target_location and target_location.location_id,
219                                            owner_type=OwnerType.place),
220                api.devices.list(workspace_location_id=target_ws_location and target_ws_location.id,
221                                 product_type='roomdesk')
222            )
223            workspaces: list[Workspace]
224            numbers: list[NumberListPhoneNumber]
225            devices: list[Device]
226
227            # only workspaces supporting desk devices
228            workspaces = [ws for ws in workspaces
229                          if ws.supported_devices == WorkspaceSupportedDevices.collaboration_devices]
230
231            # if a path to a file with workspace names was given, then filter based on the file contents
232            if ws_names:
233                with open(ws_names, mode='r') as f:
234                    workspace_names = set(s_line for line in f if (s_line := line.strip()))
235                workspaces = [ws for ws in workspaces
236                              if ws.display_name in workspace_names]
237            if not workspaces:
238                print('No workspaces', file=sys.stderr)
239                return 1
240
241            # only devices in workspaces (no personal devices)
242            devices = [d for d in devices if d.workspace_id is not None]
243
244            # prepare some lookups
245            workspace_locations_by_id: dict[str, WorkspaceLocation] = {wsl.id: wsl for wsl in ws_location_list}
246            numbers_by_workspace_uuid: dict[str, list[NumberListPhoneNumber]] = reduce(
247                lambda r, el: r[webex_id_to_uuid(el.owner.owner_id)].append(el) or r,
248                numbers,
249                defaultdict(list))
250            devices_by_workspace_id: dict[str, list[Device]] = reduce(
251                lambda r, el: r[el.workspace_id].append(el) or r,
252                devices,
253                defaultdict(list))
254
255            # sort workspaces by workspace location name and workspace name; workspace location can be unset
256            workspaces.sort(key=lambda ws: ('' if not ws.workspace_location_id else
257                                            workspace_locations_by_id[ws.workspace_location_id].display_name,
258                                            ws.display_name))
259            # some field lengths for nicer output
260            wsl_name_len = max(len(wsl.display_name) for wsl in ws_location_list)
261            ws_name_len = max(len(ws.display_name) for ws in workspaces)
262
263            # ... chain([1], ...) to avoid max() on empty sequence
264            pn_len = max(chain([1], (len(n.phone_number) for n in numbers if n.phone_number)))
265            ext_len = max(chain([1], (len(n.extension) for n in numbers if n.extension)))
266
267            # print workspaces with workspace locations, numbers, and devices
268            for workspace in workspaces:
269                if not workspace.workspace_location_id:
270                    wsl_name = ''
271                else:
272                    wsl_name = workspace_locations_by_id[workspace.workspace_location_id].display_name
273                print(f'workspace location "{wsl_name:{wsl_name_len}}", '
274                      f'workspace "{workspace.display_name:{ws_name_len}}"')
275
276                # are there any numbers in that workspace?
277                numbers = numbers_by_workspace_uuid.get(webex_id_to_uuid(workspace.workspace_id))
278                if numbers:
279                    for number in numbers:
280                        print(f'  number: {number.phone_number or "-" * pn_len:{pn_len}}/'
281                              f'{number.extension or "-" * ext_len:{ext_len}}')
282                devices = devices_by_workspace_id.get(workspace.workspace_id)
283                if devices:
284                    for device in devices:
285                        print(f'  device: {device.display_name}')
286
287            if operation == 'show':
288                # we are done here
289                return 0
290
291            # now we want to downgrade (disable calling) on all workspaces
292            print()
293            print('Starting downgrade')
294            results = await asyncio.gather(*[downgrade_workspace(ws) for ws in workspaces], return_exceptions=True)
295
296            # print errors ... if any
297            for ws, result in zip(workspaces, results):
298                ws: Workspace
299                if isinstance(result, Exception):
300                    print(f'Failed to downgrade "{ws.display_name:{ws_name_len}}": {result}', file=sys.stderr)
301
302            if any(isinstance(r, Exception) for r in results):
303                return 1
304            return 0
305
306    return asyncio.run(as_main())
307
308
309if __name__ == '__main__':
310    root_logger = logging.getLogger()
311    h = logging.StreamHandler(stream=sys.stderr)
312    h.setLevel(logging.INFO)
313    root_logger.setLevel(logging.INFO)
314    root_logger.addHandler(h)
315
316    # log REST API interactions to file
317    file_fmt = logging.Formatter(fmt='%(asctime)s %(levelname)s %(message)s')
318    file_fmt.converter = time.gmtime
319
320    rest_log_name = join(getcwd(), f'{splitext(basename(__file__))[0]}.log')
321    rest_log_handler = logging.FileHandler(rest_log_name, mode='w')
322    rest_log_handler.setLevel(logging.DEBUG)
323    rest_log_handler.setFormatter(file_fmt)
324    rest_logger = logging.getLogger('wxc_sdk.as_rest')
325    rest_logger.setLevel(logging.DEBUG)
326    rest_logger.addHandler(rest_log_handler)
327
328    exit(main())