//-< DATABASE.CXX >--------------------------------------------------*--------* // GOODS Version 1.0 (c) 1997 GARRET * ? * // (Generic Object Oriented Database System) * /\| * // * / \ * // Created: 7-Jan-97 K.A. Knizhnik * / [] \ * // Last update: 16-Sep-97 K.A. Knizhnik * GARRET * //-------------------------------------------------------------------*--------* // Application specific database interface //-------------------------------------------------------------------*--------* #include "goods.h" #ifdef _WIN32 #pragma hdrstop #endif #include "client.h" BEGIN_GOODS_NAMESPACE #define DNM_BUFFER_WATERMARK 1024*1024 // // All object storage methods are called from metaobjects // with global mutex locked. Before locking storage critical section // global mutex should be unlocked // // // This function is called while transaction is commited // to assign persistent class identifier to object which is made persistent // by this transaction. // cpid_t obj_storage::get_cpid_by_descriptor(class_descriptor* desc) { cpid_t cpid = cpid_table[desc->ctid]; if (cpid == 0) { object_monitor::unlock_global(); // wait response from server cpid = storage->put_class(desc->dbs_desc); object_monitor::lock_global(); cpid_table[desc->ctid] = cpid; descriptor_table[cpid] = desc; } return cpid; } // // Called by smart pointers to load object instance from server // to client's object cache. Global mutex is locked but should be unlocked // to make possible for other threads to continue execution. // Before returning global lock should be reset. // void obj_storage::load(hnd_t hnd, int flags) { opid_t opid = hnd->opid; internal_assert(opid != 0); object* obj = hnd->obj; int reloading = IS_VALID_OBJECT(obj) ? (obj->state & object_header::reloading) : 0; object_monitor::unlock_global(); critical_section guard(cs); assert(opened); if (!reloading) { object_monitor::lock_global(); if (IS_VALID_OBJECT(hnd->obj)) { // // Object is already loaded by concurrent thread // return; } object_monitor::unlock_global(); } if (flags & lof_auto) { flags = db->fetch_flags; } storage->load(opid, flags, loadBuf); internal_assert(loadBuf.size() != 0); object_monitor::lock_global(); dbs_object_header* hdr = (dbs_object_header*)&loadBuf; dbs_object_header* end = (dbs_object_header*)((char*)hdr + loadBuf.size()); do { cpid_t cpid = hdr->get_cpid(); assert(cpid != 0); // object was found at server class_descriptor* desc = (cpid == RAW_CPID) ? &storage_root::self_class : descriptor_table[cpid]; if (desc == NULL) { object_monitor::unlock_global(); dnm_buffer cls_buf; storage->get_class(cpid, cls_buf); assert(cls_buf.size() != 0); dbs_class_descriptor* dbs_desc = (dbs_class_descriptor*)&cls_buf; dbs_desc->unpack(); object_monitor::lock_global(); desc = class_descriptor::find(dbs_desc->name()); assert(desc != NULL); if (*desc->dbs_desc != *dbs_desc) { // // Class was changed // object_monitor::unlock_global(); cpid_t new_cpid = storage->put_class(desc->dbs_desc); object_monitor::lock_global(); descriptor_table[new_cpid] = desc; cpid_table[desc->ctid] = new_cpid; desc = NEW class_descriptor(new_cpid, desc, dbs_desc); descriptor_table[cpid] = desc; } else { descriptor_table[cpid] = desc; cpid_table[desc->ctid] = cpid; } } opid_t id = hdr->get_opid(); desc->unpack(hdr, (id == opid) ? hnd : object_handle::create_persistent_reference(this, id, 0)); hdr = (dbs_object_header*)((char*)hdr + sizeof(dbs_object_header) + hdr->get_size()); } while (hdr != end); loadBuf.truncate(DNM_BUFFER_WATERMARK); } // // Start transaction. // void obj_storage::begin_transaction() { storage->begin_transaction(transBuf); } // // Appened objects to transaction buffer. // Global mutex should be locked. // void obj_storage::include_object_in_transaction(hnd_t hnd, int flags) { dbs_object_header* hdr; object* obj = hnd->obj; if (obj->cpid == 0) { // // Objects made persistent by 'become' operator may have no assigned // persistent class indentifier. // obj->cpid = hnd->storage->get_cpid_by_descriptor(&obj->cls); } if (obj->cpid == RAW_CPID) { flags &= ~tof_update; } if (flags & tof_update) { class_descriptor* desc = descriptor_table[obj->cpid]; size_t size = desc->packed_size((char*)obj, obj->size) + sizeof(dbs_object_header); hdr = (dbs_object_header*)transBuf.append(size); desc->pack(hdr, hnd); } else { hdr = (dbs_object_header*)transBuf.append(sizeof(dbs_object_header)); hdr->set_opid(hnd->opid); hdr->set_cpid(obj->cpid); hdr->set_size(0); } hdr->set_flags(flags); } // // Commit local transaction or send part of global transaction // to coordinator. // boolean obj_storage::commit_coordinator_transaction(int n_trans_servers, stid_t* trans_servers, trid_t& tid) { object_monitor::unlock_global(); if (alloc_buf_pos > 0) { storage->bulk_allocate(alloc_size_buf, alloc_cpid_buf, alloc_buf_pos, alloc_opid_buf, alloc_buf_pos); alloc_buf_pos = 0; } boolean committed = storage->commit_coordinator_transaction(n_trans_servers, trans_servers, transBuf, tid); object_monitor::lock_global(); transBuf.truncate(DNM_BUFFER_WATERMARK); return committed; } // // Send part of global transaction to server. // void obj_storage::commit_transaction(stid_t coordinator, int n_trans_servers, stid_t* trans_servers, trid_t tid) { if (alloc_buf_pos > 0) { storage->bulk_allocate(alloc_size_buf, alloc_cpid_buf, alloc_buf_pos, alloc_opid_buf, alloc_buf_pos); alloc_buf_pos = 0; } storage->commit_transaction(coordinator, n_trans_servers, trans_servers, transBuf, tid); transBuf.truncate(DNM_BUFFER_WATERMARK); } // // Wait for global transation completion status from coordinator. // boolean obj_storage::wait_global_transaction_completion() { object_monitor::unlock_global(); boolean committed = storage->wait_global_transaction_completion(); object_monitor::lock_global(); return committed; } // // Lock function is called by meaobject protocol methods. // Object instance and global mutex are locked at this moment. // Global mutex should be unlocked before sending request to server // and locked again after receiving reply. // boolean obj_storage::lock(opid_t opid, lck_t lck, int attr) { boolean result; internal_assert(lck == lck_shared || lck == lck_exclusive); object_monitor::unlock_global(); assert(opened); { critical_section guard(cs); result = storage->lock(opid, lck, attr); } object_monitor::lock_global(); return result; } opid_t obj_storage::allocate(cpid_t cpid, size_t size, boolean aligned) { int buf_size = db->alloc_buf_size; if (buf_size != 0) { assert(opened); if (alloc_buf_size == 0) { object_monitor::unlock_global(); { critical_section guard(cs); if (alloc_buf_size == 0) { alloc_opid_buf = new opid_t[buf_size]; alloc_size_buf = new size_t[buf_size]; alloc_cpid_buf = new cpid_t[buf_size]; alloc_buf_pos = 0; storage->bulk_allocate(alloc_size_buf, alloc_cpid_buf, 0, alloc_opid_buf, buf_size); alloc_buf_size = buf_size; } } object_monitor::lock_global(); } while (alloc_buf_pos == alloc_buf_size) { object_monitor::unlock_global(); { critical_section guard(cs); if (alloc_buf_pos == alloc_buf_size) { storage->bulk_allocate(alloc_size_buf, alloc_cpid_buf, alloc_buf_size, alloc_opid_buf, alloc_buf_size); alloc_buf_pos = 0; } } object_monitor::lock_global(); } if (aligned) { size |= ALLOC_ALIGNED; } alloc_size_buf[alloc_buf_pos] = size; alloc_cpid_buf[alloc_buf_pos] = cpid; return alloc_opid_buf[alloc_buf_pos++]; } object_monitor::unlock_global(); opid_t opid; assert(opened); { critical_section guard(cs); opid = storage->allocate(cpid, size, aligned); } object_monitor::lock_global(); return opid; } nat8 obj_storage::get_used_size() { nat8 size; assert(opened); { critical_section guard(cs); size = storage->get_used_size(); } return size; } void obj_storage::disconnected(stid_t sid) { db->disconnected(sid); opened = False; } void obj_storage::login_refused(stid_t sid) { db->login_refused(sid); opened = False; } void obj_storage::invalidate(stid_t, opid_t opid) { object_monitor::lock_global(); hnd_t hnd = object_handle::find(this, opid); if (hnd != 0) { if (IS_VALID_OBJECT(hnd->obj)) { if (!cache_manager::cooperative_mode) { task::set_task_specific(db->manager); } hnd->obj->mop->invalidate(hnd); } else { hnd->obj = INVALIDATED_OBJECT; } } object_monitor::unlock_global(); } boolean obj_storage::open(char const* connection_address) { storage = db->create_dbs_storage(id); n_references = 0; if (!storage->open(connection_address)) { delete storage; storage = NULL; return False; } opened = True; n_references = 1; return True; } void obj_storage::close() { boolean was_opened; { critical_section guard(cs); was_opened = opened; if (was_opened) { opened = False; if (storage != NULL) { storage->close(); } } } if (was_opened) { object_monitor::lock_global(); cache_manager::get()->cleanup_cache(this); if (--n_references == 0) { delete this; } object_monitor::unlock_global(); } } obj_storage::obj_storage(database* dbs, stid_t sid) : storage(NULL), db(dbs), id(sid) { alloc_buf_size = 0; alloc_buf_pos = 0; alloc_opid_buf = NULL; alloc_size_buf = NULL; alloc_cpid_buf = NULL; } // // Destructor is called from close or remove_reference methods // with global mutex locked. // obj_storage::~obj_storage() { class_descriptor** dpp = &descriptor_table; int n = descriptor_table.size(); while (--n >= 0) { if (*dpp != NULL) { // // remove class descriptors created only for loading // instances of objects of modified classes // (*dpp)->dealloc(); } dpp += 1; } delete storage; delete[] alloc_opid_buf; delete[] alloc_size_buf; delete[] alloc_cpid_buf; } // // Database (collection of storages) // boolean database::open(const char* database_configuration_file) { char buf[MAX_CFG_FILE_LINE_SIZE]; critical_section guard(cs); if (opened) { return True; } opened = False; FILE* cfg = fopen(database_configuration_file, "r"); if (cfg == NULL) { console::output("Failed to open database configuration file: '%s'\n", database_configuration_file); return False; } if (fgets(buf, sizeof buf, cfg) == NULL || sscanf(buf, "%d", &n_storages) != 1) { console::output("Bad format of configuration file '%s'\n", database_configuration_file); return False; } storages = NEW obj_storage*[n_storages]; memset(storages, 0, n_storages*sizeof(obj_storage*)); opened = True; while (fgets(buf, sizeof buf, cfg)) { int i; char hostname[MAX_CFG_FILE_LINE_SIZE]; if (sscanf(buf, "%d:%s", &i, hostname) == 2) { if (i < n_storages) { if (storages[i] != NULL) { console::output("Duplicated entry in configuration file: " "%s", buf); } storages[i] = NEW obj_storage(this, i); if (!storages[i]->open(hostname)) { fclose(cfg); close(); return False; } } } } fclose(cfg); return True; } dbs_storage* database::create_dbs_storage(stid_t sid) const { return NEW dbs_client_storage(sid, storages[sid]); } void database::close() { critical_section guard(cs); if (opened) { for (int i = n_storages; -- i >= 0;) { if (storages[i] != NULL) { storages[i]->close(); } } delete[] storages; opened = False; } } void database::get_root(object_reference& r, stid_t sid) { object_monitor::lock_global(); assert(opened && sid < n_storages); obj_storage* storage = storages[sid]; assert(storage != NULL); hnd_t hnd = object_handle::create_persistent_reference(storage, ROOT_OPID); r.unlink(); r.hnd = hnd; // // It is not necessary to load this object immediatly // since root object can't never be removed and server // should not know that client has reference to this object // object_monitor::unlock_global(); } void database::disconnected(stid_t sid) { console::error("Server %d is disconnected\n", sid); } void database::login_refused(stid_t sid) { console::error("Authorization procedure fails at server %d\n", sid); } stid_t database::get_storage_with_minimal_size() { if (n_storages != 0) { stid_t min_sid = 0; nat8 min_size = get_used_size(0); for (int i = n_storages; --i > 0;) { nat8 size = get_used_size(i); if (size < min_size) { min_sid = i; min_size = size; } } return min_sid; } return (stid_t)-1; } void database::attach() { if (manager == NULL) { manager = new cache_manager(); } manager->attach(); } void database::detach() { manager->detach(); delete manager; manager = NULL; } database::database() { storages = NULL; n_storages = 0; opened = False; manager = NULL; fetch_flags = lof_none; alloc_buf_size = 0; } void database::enable_clustering(boolean enabled) { fetch_flags = enabled ? lof_cluster : lof_none; } void database::set_alloc_buffer_size(size_t size) { alloc_buf_size = size; } cache_manager* database ::get_cache_manager() { return manager != NULL ? manager : cache_manager::get(); } void database::set_isolation_level(transaction_isolation_level level) { get_cache_manager()->set_isolation_level(level); } database::~database() { delete manager; } END_GOODS_NAMESPACE