Skip to content
Snippets Groups Projects
Commit 13ec4c77 authored by Marta Vila Fernandes's avatar Marta Vila Fernandes :game_die:
Browse files

redo the code: get the info per tuples and ip

parent a31219f2
No related branches found
No related tags found
1 merge request!11redo the code: store the info of tuples (IP, OS, ACCTYPE) and then summarize info per IP
Pipeline #8240199 waiting for manual action
......@@ -38,13 +38,13 @@ def setup_query(os_tags, stime, etime):
etime = etime.strftime("%Y-%m-%dT%H:%M:%S")
if 'all' in os_tags:
query = {'version': True, 'size': 10000,
'_source': ['metadata.timestamp', 'data.os_source', 'data.os_arch', 'data.path', 'data.clientip', 'data.puppet_managed'], 'query': {
'_source': ['metadata.timestamp', 'data.os_source', 'data.os_arch', 'data.path', 'data.clientip', 'data.puppet_managed', 'data.acctype'], 'query': {
'bool': {'filter': [OS_EXISTS, APACHE_RESPONSE, REPO_PATH, CERN_IP_ADD,
{'range': {'metadata.timestamp': {'gte': stime, 'lte': etime, 'format': "yyyy-MM-dd'T'HH:mm:ss"}}}]}}}
else:
q_os_tags = os_tags_query(os_tags)
query = {'version': True, 'size': 10000,
'_source': ['metadata.timestamp', 'data.os_source', 'data.os_arch', 'data.path', 'data.clientip', 'data.puppet_managed'], 'query': {
'_source': ['metadata.timestamp', 'data.os_source', 'data.os_arch', 'data.path', 'data.clientip', 'data.puppet_managed', 'data.acctype'], 'query': {
'bool': {'filter': [OS_EXISTS, APACHE_RESPONSE, REPO_PATH, CERN_IP_ADD, q_os_tags,
{'range': {'metadata.timestamp': {'gte': stime, 'lte': etime, 'format': "yyyy-MM-dd'T'HH:mm:ss"}}}]}}}
return query
......@@ -69,10 +69,13 @@ def get_data_from_os(client, query):
print(f'Total number of hits: {counter}')
def get_unique_ips(client, query, puppetmanaged, hosttype, date):
""" Get the unique IPs from Opensearch"""
def get_unique_tuples(client, query, puppetmanaged, hosttype, date):
""" Get the unique tuples (IP, OS, ACCTYPE) from Opensearch
And a dictionary with the unique ips, with the tuples associated to
"""
t0 = time.time()
print('Running Opensearch query')
dict_tuples = {}
dict_ips = {}
arch_list = ['aarch64', 'x86_64', 'ppc64le', 'i386']
for hit in get_data_from_os(client, query):
......@@ -86,20 +89,51 @@ def get_unique_ips(client, query, puppetmanaged, hosttype, date):
os_arch = os_arch[0]
else:
os_arch = None
if ipadd not in dict_ips.keys():
tuple_ip_os_acctype = (ipadd, data["os_source"], data["acctype"])
if tuple_ip_os_acctype not in dict_tuples.keys():
if puppet_managed == puppetmanaged or hosttype=='all':
dict_ips[ipadd] = {'IP': [ipadd],
dict_tuples[tuple_ip_os_acctype] = {'IP': [ipadd],
'PuppetManaged': puppet_managed,
'OS': data["os_source"],
'ARCH': os_arch,
'Requests': 1,
'@timestamp': date}
'@timestamp': date,
'ACCTYPE': data["acctype"]}
else:
dict_tuples[tuple_ip_os_acctype]['Requests'] += 1
# Create a dictionary by ip that includes the list of the tuples.
if ipadd not in dict_ips.keys():
dict_ips[ipadd] = [tuple_ip_os_acctype]
else:
dict_ips[ipadd]['Requests'] += 1
if tuple_ip_os_acctype not in dict_ips[ipadd]:
dict_ips[ipadd].append(tuple_ip_os_acctype)
print(f'Total time: {time.time() - t0}')
print('Dict of Tuples:')
print(dict_tuples)
print('Dict of unique IPs:')
print(dict_ips)
return dict_ips
return dict_tuples, dict_ips
def get_os_per_number_requests(dict_tuples, dict_ips):
""" Get the OS of the machine based on the max number
of requests done to linuxsoft repositories
"""
t0 = time.time()
dict_result = {}
for ip in dict_ips:
ips_tuples = dict_ips[ip]
max_nrequests = 0
tuple_ip_win = {}
for tuple_ip in ips_tuples:
nrequests = dict_tuples[tuple_ip]['Requests']
if nrequests > max_nrequests:
max_nrequests = nrequests
tuple_ip_win = dict_tuples[tuple_ip]
dict_result[ip] = tuple_ip_win
print(f'Total time: {time.time() - t0}')
print('Dict of unique IPs and their OS:')
# print(dict_result)
return dict_result
def get_landb_client():
......@@ -122,7 +156,7 @@ def get_landb_client():
client.set_options(soapheaders=authenticationHeader)
return client
def get_landb_owners(cern_ips, dir_path, check_landb_file=False):
def get_landb_owners(cern_ips, dir_path, check_landb_file=True):
"""Get the LanDB owners or groups given a list of IP addresses"""
t0 = time.time()
print('Running LanDB')
......@@ -207,35 +241,56 @@ def get_landb_owners(cern_ips, dir_path, check_landb_file=False):
with open(f"{dir_path}/landb_data.json", "w") as outfile:
outfile.write(json_object)
print(f'Total time: {time.time() - t0}')
print('Dict of LanDB info:')
print(landb_dict)
#print('Dict of LanDB info:')
#print(landb_dict)
return landb_dict
def aggregate_info(landb_dict, os_dict):
def aggregate_info(landb_dict, os_dict, tuples=False):
"""Aggregate Openstack and LanDB information"""
t0 = time.time()
print('Aggregating OS and LanDB info')
cern_dict = {}
for ipadd in landb_dict:
landb_data = landb_dict[ipadd]
os_data = os_dict[ipadd]
hostname = landb_data['Hostname']
if hostname in cern_dict.keys():
cern_dict[hostname]['Requests'] += os_data['Requests']
cern_dict[hostname]['IP'].append(os_data['IP'][0])
else:
landb_data.update(os_data)
cern_dict[hostname] = landb_data
if tuples:
for tuple_ip_os_acc in os_dict:
ip_os_acc_data = os_dict[tuple_ip_os_acc]
ipadd = tuple_ip_os_acc[0]
try:
landb_data_original = landb_dict[ipadd]
landb_data = landb_data_original.copy()
except:
continue
hostname = landb_data['Hostname']
new_tuple = (hostname, tuple_ip_os_acc[1], tuple_ip_os_acc[2])
if new_tuple in cern_dict.keys():
cern_dict[new_tuple]['Requests'] += ip_os_acc_data['Requests']
if ipadd not in cern_dict[new_tuple]['IP']:
cern_dict[new_tuple]['IP'].append(ipadd)
else:
landb_data.update(ip_os_acc_data)
cern_dict[new_tuple] = landb_data
else:
for ipadd in landb_dict:
landb_data = landb_dict[ipadd]
os_data = os_dict[ipadd]
hostname = landb_data['Hostname']
if hostname in cern_dict.keys():
if os_data['Requests'] > cern_dict[hostname]['Requests']:
cern_dict[hostname]['Requests'] = os_data['Requests']
cern_dict[hostname]['OS'] = os_data['OS']
cern_dict[hostname]['ACCTYPE'] = os_data['ACCTYPE']
cern_dict[hostname]['IP'].append(ipadd)
else:
landb_data.update(os_data)
cern_dict[hostname] = landb_data
print(f'Total time: {time.time() - t0}')
print('Distribution stats info:')
print(cern_dict)
#print('Distribution stats info:')
#print(cern_dict)
return cern_dict
def bulk_docs(client, data_dict, number_actions=1000):
def bulk_docs(client, data_dict, indexname, number_actions=1000):
"""Bulk data to Opensearch"""
print('Bulk data to Opensearch')
t0 = time.time()
indexname = 'linux_private-lxsoft-stats'
bulk_data = {"_op_type": "create", "_index": indexname}
actions = []
for hostname in data_dict.keys():
......@@ -257,7 +312,7 @@ def bulk_docs(client, data_dict, number_actions=1000):
def os_add_doc_index(client, document):
"""Add a document to the index."""
indexname = 'linux_private-lxsoft-stats'
indexname = 'lxsoft-distributions_stats'
response = client.index(
index=indexname,
body=document,
......
......@@ -6,8 +6,8 @@ import json
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
from utils import setup_times, setup_query
from utils import get_landb_owners , get_unique_ips
from utils import aggregate_info, os_add_doc_index, bulk_docs
from utils import get_landb_owners , get_unique_tuples, get_os_per_number_requests
from utils import aggregate_info, bulk_docs
def parse_args():
""" define command line arguments"""
......@@ -134,22 +134,30 @@ def main():
if args.landb:
# setup the query file with given time ranges
query = setup_query(args.os_tags, stime, etime)
# get the data from Opensearch
cern_ips = get_unique_ips(client, query, puppetmanaged, args.hosttype, etime.strftime("%Y-%m-%dT%H:%M:%S"))
print(f"""INFO: There are {len(cern_ips.keys())} unique ips
# get the data from Opensearch - tuples (IP, OS, ACCTYPE), and unique IPs
dict_tuples, dict_ips = get_unique_tuples(client, query, puppetmanaged, args.hosttype, etime.strftime("%Y-%m-%dT%H:%M:%S"))
print(f"""INFO: There are {len(dict_tuples.keys())} unique tuples
that have accessed linuxsoft from {stime} to {etime}""")
print(f"""INFO: There are {len(dict_ips.keys())} unique ips
that have accessed linuxsoft from {stime} to {etime}""")
# get the OS of each IP
cern_ips_os = get_os_per_number_requests(dict_tuples, dict_ips)
print(f"""INFO: There are {len(cern_ips_os.keys())} unique ips
that have accessed linuxsoft from {stime} to {etime}""")
# get landb information
landb_dict = get_landb_owners(cern_ips.keys(), args.dir_path)
landb_dict = get_landb_owners(cern_ips_os.keys(), args.dir_path)
# Agggregate Opensearch info with LanDB info
data_dict = aggregate_info(landb_dict, cern_ips)
#for hostname in data_dict:
# os_add_doc_index(client, data_dict[hostname])
data_dict_all = aggregate_info(landb_dict, dict_tuples, tuples=True)
data_dict_resume = aggregate_info(landb_dict, cern_ips_os)
# Bulk documents to the index
bulk_docs(client, data_dict, number_actions=1000)
indexname_diststats = 'lxsoft-distributions_stats'
bulk_docs(client, data_dict_resume, indexname_diststats, number_actions=1000)
indexname_tuples = 'lxsoft-distributions_all_access'
bulk_docs(client, data_dict_all, indexname_tuples, number_actions=1000)
if args.dir_path:
# Serializing json
json_object = json.dumps(data_dict, indent=4)
json_object = json.dumps(data_dict_resume, indent=4)
# Writing to a json file
with open(f"{args.dir_path}/stats_{str(stime.date())}.json", "w") as outfile:
outfile.write(json_object)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment